From a055897a2ac26334b0ae218958b145ab4735ac72 Mon Sep 17 00:00:00 2001 From: ynaim94 Date: Thu, 26 May 2022 10:54:17 -0400 Subject: [PATCH 1/7] init show downstream dbt --- Dockerfile | 6 ++- bin/docker_run.sh | 3 ++ python/etl/commands.py | 100 ++++++++++++++++++++++++----------------- python/etl/load.py | 1 + requirements.txt | 1 + 5 files changed, 68 insertions(+), 43 deletions(-) diff --git a/Dockerfile b/Dockerfile index c807fb30..904c87c7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -28,11 +28,13 @@ RUN yum install -y \ tmux \ vim-minimal +RUN amazon-linux-extras install docker + # Run as non-priviledged user "arthur". RUN useradd --comment 'Arthur ETL' --user-group --create-home arthur && \ mkdir --parent /opt/data-warehouse "/opt/local/$PROJ_NAME" /opt/src/arthur-redshift-etl && \ - chown -R arthur.arthur /opt/* -USER arthur + chown -R arthur.arthur /opt/* /var/* + # The .bashrc will ensure the virutal environment is activated when running interactive shells. COPY --chown=arthur:arthur docker/* /home/arthur/ diff --git a/bin/docker_run.sh b/bin/docker_run.sh index 1113c69c..84e556ba 100755 --- a/bin/docker_run.sh +++ b/bin/docker_run.sh @@ -166,6 +166,8 @@ case "$action" in docker run --rm --interactive --tty \ --env ARTHUR_DEFAULT_PREFIX="$target_env" \ --env DATA_WAREHOUSE_CONFIG="/opt/data-warehouse/$config_path" \ + --env DBT_ROOT="$data_warehouse_path/dbt" \ + --env DBT_PROFILES_DIR="${HOME}/.dbt/profiles.yml" \ --sysctl net.ipv4.tcp_keepalive_time=300 \ --sysctl net.ipv4.tcp_keepalive_intvl=60 \ --sysctl net.ipv4.tcp_keepalive_probes=9 \ @@ -173,6 +175,7 @@ case "$action" in --volume ~/.ssh:/home/arthur/.ssh:ro \ --volume "$data_warehouse_path:/opt/data-warehouse" \ --volume "$(pwd):/opt/src/arthur-redshift-etl" \ + --volume /var/run/docker.sock:/var/run/docker.sock \ $publish_arg \ $profile_arg \ "arthur-redshift-etl:$tag" diff --git a/python/etl/commands.py b/python/etl/commands.py index a36e9837..93a7507c 100644 --- a/python/etl/commands.py +++ b/python/etl/commands.py @@ -12,6 +12,7 @@ import shlex import sys import uuid +from collections import namedtuple from contextlib import contextmanager from datetime import datetime, timedelta, timezone from typing import Iterable, List, Optional @@ -42,6 +43,7 @@ import etl.templates import etl.unload import etl.validate +from etl.dbt import DBTProject from etl.errors import ETLError, ETLSystemError, InvalidArgumentError from etl.text import join_with_single_quotes from etl.util import croak, isoformat_datetime_string @@ -198,11 +200,11 @@ def submit_step(cluster_id, sub_command): "HadoopJarStep": { "Jar": "command-runner.jar", "Args": [ - etl.config.etl_tmp_dir("venv/bin/arthur.py"), - "--config", - etl.config.etl_tmp_dir("config"), - ] - + remaining, + etl.config.etl_tmp_dir("venv/bin/arthur.py"), + "--config", + etl.config.etl_tmp_dir("config"), + ] + + remaining, }, } ], @@ -438,7 +440,7 @@ def add_standard_arguments(parser: argparse.ArgumentParser, option_names: Iterab metavar="N", type=int, help="set max number of parallel loads to N (overrides " - "'resources.RedshiftCluster.max_concurrency')", + "'resources.RedshiftCluster.max_concurrency')", ) if "wlm-query-slots" in options: options.discard("wlm-query-slots") @@ -448,7 +450,7 @@ def add_standard_arguments(parser: argparse.ArgumentParser, option_names: Iterab metavar="N", type=int, help="set the number of Redshift WLM query slots used for transformations" - " (overrides 'resources.RedshiftCluster.wlm_query_slots')", + " (overrides 'resources.RedshiftCluster.wlm_query_slots')", ) if "statement-timeout" in options: options.discard("statement-timeout") @@ -458,8 +460,8 @@ def add_standard_arguments(parser: argparse.ArgumentParser, option_names: Iterab metavar="MILLISECS", type=int, help="set the timeout before canceling a statement in Redshift. This time includes planning," - " queueing in WLM, and execution time. (overrides " - "'resources.RedshiftCluster.statement_timeout')", + " queueing in WLM, and execution time. (overrides " + "'resources.RedshiftCluster.statement_timeout')", ) if "skip-copy" in options: options.discard("skip-copy") @@ -474,9 +476,9 @@ def add_standard_arguments(parser: argparse.ArgumentParser, option_names: Iterab parser.add_argument( "--continue-from", help="skip forward in execution until the specified relation, then work forward from it" - " (the special token '*' is allowed to signify continuing from the first relation," - " use ':transformations' as the argument to continue from the first transformation,)" - " otherwise specify an exact relation or source name)", + " (the special token '*' is allowed to signify continuing from the first relation," + " use ':transformations' as the argument to continue from the first transformation,)" + " otherwise specify an exact relation or source name)", ) if "pattern" in options: options.discard("pattern") @@ -513,7 +515,7 @@ class SubCommand(abc.ABC): uses_monitor = False def __init__( - self, name: str, help_: str, description: str, aliases: Optional[List[str]] = None + self, name: str, help_: str, description: str, aliases: Optional[List[str]] = None ) -> None: self.name = name self.help = help_ @@ -576,7 +578,7 @@ def location(args, default_scheme=None): raise ETLSystemError("scheme invalid") def find_relation_descriptions( - self, args, default_scheme=None, required_relation_selector=None, return_all=False + self, args, default_scheme=None, required_relation_selector=None, return_all=False ): """ Most commands need to collect file sets and create relation descriptions around those. @@ -636,7 +638,7 @@ def add_arguments(self, parser): "-f", "--force", help="destructively initialize the referenced database regardless" - " of whether it looks like a validation database", + " of whether it looks like a validation database", default=False, action="store_true", ) @@ -892,7 +894,7 @@ def add_arguments(self, parser): "type", choices=["CTAS", "VIEW", "update", "check-only"], help="pick whether to create table designs for 'CTAS' or 'VIEW' relations" - " , update the current relation, or check the current designs", + " , update the current relation, or check the current designs", ) # Note that patterns must follow the choice of CTAS, VIEW, update etc. add_standard_arguments(parser, ["pattern"]) @@ -1002,7 +1004,7 @@ def add_arguments(self, parser): const="manifest-only", dest="extractor", help="skip extraction and go straight to creating manifest files, " - "implied default for static sources", + "implied default for static sources", ) parser.add_argument( "-k", @@ -1076,15 +1078,15 @@ def add_arguments(self, parser): parser.add_argument( "--concurrent-extract", help="watch DynamoDB for extract step completion and load source tables as extracts" - " finish assuming another Arthur in this prefix is running extract" - " (default: %(default)s)", + " finish assuming another Arthur in this prefix is running extract" + " (default: %(default)s)", default=False, action="store_true", ) parser.add_argument( "--without-staging-schemas", help="do NOT do all the work in hidden schemas and publish to standard names on" - " completion (default: use staging schemas)", + " completion (default: use staging schemas)", default=True, action="store_false", dest="use_staging_schemas", @@ -1094,7 +1096,7 @@ def add_arguments(self, parser): default=True, action="store_false", help="Skip publishing staging schemas and keep result of the load step in" - " staging (default: publish schemas from staging)", + " staging (default: publish schemas from staging)", dest="publish_staging_schemas", ) parser.add_argument( @@ -1127,7 +1129,7 @@ def callback(self, args): ) statement_timeout = args.statement_timeout if statement_timeout is None and etl.config.is_config_set( - "resources.RedshiftCluster.statement_timeout" + "resources.RedshiftCluster.statement_timeout" ): statement_timeout = etl.config.get_config_int("resources.RedshiftCluster.statement_timeout") wlm_query_slots = args.wlm_query_slots or etl.config.get_config_int( @@ -1179,13 +1181,13 @@ def add_arguments(self, parser): action="store_true", default=False, help="skip rebuilding relations that depend on the selected ones" - " (leaves warehouse in inconsistent state, for debugging only)", + " (leaves warehouse in inconsistent state, for debugging only)", ) parser.add_argument( "--include-immediate-views", action="store_true", help="include views that are downstream of selected relations without any CTAS before" - " (this is the default and only useful with '--only-selected', for debugging only)", + " (this is the default and only useful with '--only-selected', for debugging only)", ) group = parser.add_mutually_exclusive_group() group.add_argument( @@ -1194,7 +1196,7 @@ def add_arguments(self, parser): default=False, dest="use_staging_schemas", help="do all the work using hidden schemas (default: do not use staging schemas," - " note this is the opposite of 'load' command)", + " note this is the opposite of 'load' command)", ) group.add_argument( "--into-schema", @@ -1224,7 +1226,7 @@ def callback(self, args): ) statement_timeout = args.statement_timeout if statement_timeout is None and etl.config.is_config_set( - "resources.RedshiftCluster.statement_timeout" + "resources.RedshiftCluster.statement_timeout" ): statement_timeout = etl.config.get_config_int("resources.RedshiftCluster.statement_timeout") wlm_query_slots = args.wlm_query_slots or etl.config.get_config_int( @@ -1264,7 +1266,7 @@ def add_arguments(self, parser): parser.add_argument( "--only-selected", help="only load data into selected relations" - " (leaves warehouse in inconsistent state, for debugging only, default: %(default)s)", + " (leaves warehouse in inconsistent state, for debugging only, default: %(default)s)", default=False, action="store_true", ) @@ -1274,7 +1276,7 @@ def add_arguments(self, parser): default=None, type=isoformat_datetime_string, help="require recent successful extract events for all selected source relations " - "after UTC time TIME (or, by default, don't require extract events)", + "after UTC time TIME (or, by default, don't require extract events)", ) parser.add_argument( "--vacuum", @@ -1287,7 +1289,7 @@ def callback(self, args): relations = self.find_relation_descriptions(args, default_scheme="s3", return_all=True) statement_timeout = args.statement_timeout if statement_timeout is None and etl.config.is_config_set( - "resources.RedshiftCluster.statement_timeout" + "resources.RedshiftCluster.statement_timeout" ): statement_timeout = etl.config.get_config_int("resources.RedshiftCluster.statement_timeout") wlm_query_slots = args.wlm_query_slots or etl.config.get_config_int( @@ -1762,19 +1764,35 @@ def add_arguments(self, parser): action="store_true", help="show list of dependents (upstream) for every relation", ) + group.add_argument( + "--with-dbt", + action="store_true", + help="show list of dependents (upstream) for every relation", + ) def callback(self, args): - dw_config = etl.config.get_dw_config() - relations = self.find_relation_descriptions( - args, required_relation_selector=dw_config.required_in_full_load_selector, return_all=True - ) - etl.load.show_downstream_dependents( - relations, - args.pattern, - continue_from=args.continue_from, - with_dependencies=args.with_dependencies, - with_dependents=args.with_dependents, - ) + # dw_config = etl.config.get_dw_config() + # relations = self.find_relation_descriptions( + # args, required_relation_selector=dw_config.required_in_full_load_selector, return_all=True + # ) + # relations = etl.load.show_downstream_dependents( + # relations, + # args.pattern, + # continue_from=args.continue_from, + # with_dependencies=args.with_dependencies, + # with_dependents=args.with_dependents, + # ) + # + # if not args.with_dbt: + # return + # + # dbt_model_identifiers = [relation.identifier.split('.') for relation in relations] + + print(os.environ['DBT_ROOT'], os.environ['DBT_PROFILES_DIR']) + dbt_project = DBTProject(os.environ['DBT_ROOT'], os.environ['DBT_PROFILES_DIR']) + ret = dbt_project.build_image() + # print([l for l in ret]) + dbt_project.run_cmd("ls") class ShowUpstreamDependenciesCommand(SubCommand): @@ -1929,7 +1947,7 @@ def add_arguments(self, parser): action="append", choices=["step", "event", "elapsed", "rowcount"], help="select output column (in addition to target and timestamp)," - " use multiple times so add more columns", + " use multiple times so add more columns", ) parser.add_argument("etl_id", help="pick particular ETL from the past", nargs="?") diff --git a/python/etl/load.py b/python/etl/load.py index 6c16d347..1c9b30e9 100644 --- a/python/etl/load.py +++ b/python/etl/load.py @@ -1533,6 +1533,7 @@ def show_downstream_dependents( width=width_dep, ) ) + return selected_relations def show_upstream_dependencies(relations: Sequence[RelationDescription], selector: TableSelector): diff --git a/requirements.txt b/requirements.txt index 42798c9e..0f16898c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,3 +13,4 @@ tabulate==0.8.9 termcolor==1.1.0 tqdm==4.64.0 watchtower==3.0.0 +docker From 65cd330640b9688c5013e4557b01e41c894c0345 Mon Sep 17 00:00:00 2001 From: ynaim94 Date: Thu, 26 May 2022 17:41:46 -0400 Subject: [PATCH 2/7] Arthur container uses root to access docker daemon --- Dockerfile | 24 +++++++++--------------- bin/arthur.sh | 4 ++-- bin/docker_run.sh | 14 +++++++------- 3 files changed, 18 insertions(+), 24 deletions(-) diff --git a/Dockerfile b/Dockerfile index 904c87c7..51e30342 100644 --- a/Dockerfile +++ b/Dockerfile @@ -30,18 +30,13 @@ RUN yum install -y \ RUN amazon-linux-extras install docker -# Run as non-priviledged user "arthur". -RUN useradd --comment 'Arthur ETL' --user-group --create-home arthur && \ - mkdir --parent /opt/data-warehouse "/opt/local/$PROJ_NAME" /opt/src/arthur-redshift-etl && \ - chown -R arthur.arthur /opt/* /var/* - +RUN mkdir --parent /opt/data-warehouse "/opt/local/$PROJ_NAME" /opt/src/arthur-redshift-etl # The .bashrc will ensure the virutal environment is activated when running interactive shells. -COPY --chown=arthur:arthur docker/* /home/arthur/ +COPY docker/* /root # Install code under /opt/local/ (although it is under /tmp/ on an EC2 host). -COPY --chown=arthur:arthur \ - bin/create_validation_credentials \ +COPY bin/create_validation_credentials \ bin/release_version.sh \ bin/send_health_check.sh \ bin/sync_env.sh \ @@ -55,13 +50,13 @@ RUN python3 -m venv "/opt/local/$PROJ_NAME/venv" && \ python3 -m pip install --requirement /tmp/requirements-all.txt --disable-pip-version-check --no-cache-dir # Create an empty .pgpass file to help with the format of this file. -RUN echo '# Format to set password when updating users: *:5439:*::' > /home/arthur/.pgpass \ - && chmod go= /home/arthur/.pgpass +RUN echo '# Format to set password when updating users: *:5439:*::' > /root/.pgpass \ + && chmod go= /root/.pgpass # Note that at runtime we (can or may) mount the local directory here. # But we want to be independent of the source so copy everything over once. WORKDIR /opt/src/arthur-redshift-etl -COPY --chown=arthur:arthur ./ ./ +COPY ./ ./ # We run this here once in case somebody overrides the entrypoint. RUN source "/opt/local/$PROJ_NAME/venv/bin/activate" && \ @@ -76,15 +71,14 @@ EXPOSE 8086 # The data warehouse (with schemas, config, etc.) will be mounted here: WORKDIR /opt/data-warehouse -ENTRYPOINT ["/home/arthur/entrypoint.sh"] +ENTRYPOINT ["/root/entrypoint.sh"] CMD ["/bin/bash", "--login"] # Second stage, overriding entrypoint in the image to not have to override it everytime # we use the Arthur image in a remote environment. entrypoint_remote.sh will fetch config # and schema files from S3 FROM local AS remote -COPY --chown=arthur:arthur \ - bin/bootstrap_remote_dw.sh \ +COPY bin/bootstrap_remote_dw.sh \ "/opt/local/$PROJ_NAME/bin/" ENV PROJ_NAME=$PROJ_NAME -ENTRYPOINT ["/home/arthur/entrypoint_remote.sh"] +ENTRYPOINT ["/root/entrypoint_remote.sh"] diff --git a/bin/arthur.sh b/bin/arthur.sh index 74d0466a..9a3549fe 100755 --- a/bin/arthur.sh +++ b/bin/arthur.sh @@ -136,7 +136,7 @@ docker run --rm --interactive --tty \ --sysctl net.ipv4.tcp_keepalive_time=300 \ --sysctl net.ipv4.tcp_keepalive_intvl=60 \ --sysctl net.ipv4.tcp_keepalive_probes=9 \ - --volume ~/.aws:/home/arthur/.aws \ - --volume ~/.ssh:/home/arthur/.ssh:ro \ + --volume ~/.aws:/root/.aws \ + --volume ~/.ssh:/root/.ssh:ro \ --volume "$data_warehouse_path:/opt/data-warehouse" \ "$docker_image:$tag" diff --git a/bin/docker_run.sh b/bin/docker_run.sh index 84e556ba..d2ebca4c 100755 --- a/bin/docker_run.sh +++ b/bin/docker_run.sh @@ -154,7 +154,7 @@ case "$action" in docker run --rm --tty \ --env ARTHUR_DEFAULT_PREFIX="$target_env" \ --env DATA_WAREHOUSE_CONFIG="/opt/data-warehouse/$config_path" \ - --volume ~/.aws:/home/arthur/.aws \ + --volume ~/.aws:/root/.aws \ --volume "$data_warehouse_path":/opt/data-warehouse \ $profile_arg \ "arthur-redshift-etl:$tag" \ @@ -171,8 +171,8 @@ case "$action" in --sysctl net.ipv4.tcp_keepalive_time=300 \ --sysctl net.ipv4.tcp_keepalive_intvl=60 \ --sysctl net.ipv4.tcp_keepalive_probes=9 \ - --volume ~/.aws:/home/arthur/.aws \ - --volume ~/.ssh:/home/arthur/.ssh:ro \ + --volume ~/.aws:/root/.aws \ + --volume ~/.ssh:/root/.ssh:ro \ --volume "$data_warehouse_path:/opt/data-warehouse" \ --volume "$(pwd):/opt/src/arthur-redshift-etl" \ --volume /var/run/docker.sock:/var/run/docker.sock \ @@ -189,8 +189,8 @@ case "$action" in --sysctl net.ipv4.tcp_keepalive_time=300 \ --sysctl net.ipv4.tcp_keepalive_intvl=60 \ --sysctl net.ipv4.tcp_keepalive_probes=9 \ - --volume ~/.aws:/home/arthur/.aws \ - --volume ~/.ssh:/home/arthur/.ssh:ro \ + --volume ~/.aws:/root/.aws \ + --volume ~/.ssh:/root/.ssh:ro \ --volume "$data_warehouse_path:/opt/data-warehouse" \ $publish_arg \ $profile_arg \ @@ -202,7 +202,7 @@ case "$action" in docker run --rm --tty \ --env ARTHUR_DEFAULT_PREFIX="$target_env" \ --env DATA_WAREHOUSE_CONFIG="/opt/data-warehouse/$config_path" \ - --volume ~/.aws:/home/arthur/.aws \ + --volume ~/.aws:/root/.aws \ --volume "$data_warehouse_path:/opt/data-warehouse" \ $profile_arg \ "arthur-redshift-etl:$tag" \ @@ -214,7 +214,7 @@ case "$action" in docker run --rm --tty \ --env ARTHUR_DEFAULT_PREFIX="$target_env" \ --env DATA_WAREHOUSE_CONFIG="/opt/data-warehouse/$config_path" \ - --volume ~/.aws:/home/arthur/.aws \ + --volume ~/.aws:/root/.aws \ --volume "$data_warehouse_path":/opt/data-warehouse \ $profile_arg \ "arthur-redshift-etl:$tag" \ From 6b6b094ed8e8706433baad62ff0118ba248449ae Mon Sep 17 00:00:00 2001 From: ynaim94 Date: Thu, 26 May 2022 18:33:10 -0400 Subject: [PATCH 3/7] Arthur show downstream dependents outputs downstream dbt tables --- python/etl/commands.py | 42 +++++++++++++------------- python/etl/dbt.py | 67 ++++++++++++++++++++++++++++++++++++++++++ python/etl/load.py | 2 +- 3 files changed, 89 insertions(+), 22 deletions(-) create mode 100644 python/etl/dbt.py diff --git a/python/etl/commands.py b/python/etl/commands.py index 93a7507c..5eabc46c 100644 --- a/python/etl/commands.py +++ b/python/etl/commands.py @@ -43,7 +43,7 @@ import etl.templates import etl.unload import etl.validate -from etl.dbt import DBTProject +from etl.dbt import DBTProject, DbtModelIdentifier from etl.errors import ETLError, ETLSystemError, InvalidArgumentError from etl.text import join_with_single_quotes from etl.util import croak, isoformat_datetime_string @@ -1771,28 +1771,28 @@ def add_arguments(self, parser): ) def callback(self, args): - # dw_config = etl.config.get_dw_config() - # relations = self.find_relation_descriptions( - # args, required_relation_selector=dw_config.required_in_full_load_selector, return_all=True - # ) - # relations = etl.load.show_downstream_dependents( - # relations, - # args.pattern, - # continue_from=args.continue_from, - # with_dependencies=args.with_dependencies, - # with_dependents=args.with_dependents, - # ) - # - # if not args.with_dbt: - # return - # - # dbt_model_identifiers = [relation.identifier.split('.') for relation in relations] - - print(os.environ['DBT_ROOT'], os.environ['DBT_PROFILES_DIR']) + dw_config = etl.config.get_dw_config() + relations = self.find_relation_descriptions( + args, required_relation_selector=dw_config.required_in_full_load_selector, return_all=True + ) + relations = etl.load.show_downstream_dependents( + relations, + args.pattern, + continue_from=args.continue_from, + with_dependencies=args.with_dependencies, + with_dependents=args.with_dependents, + ) + + if not args.with_dbt: + return + + dbt_model_identifiers = [DbtModelIdentifier(*relation.identifier.split('.')) for relation in relations] dbt_project = DBTProject(os.environ['DBT_ROOT'], os.environ['DBT_PROFILES_DIR']) + dbt_downstream = ' '.join([f"{parent}+" for parent in dbt_project.show_downstream_dbt_parents(dbt_model_identifiers)]) + ret = dbt_project.build_image() - # print([l for l in ret]) - dbt_project.run_cmd("ls") + print(dbt_project.run_cmd( + f"dbt list -t dev --exclude redshift --output name --resource-type model -s {dbt_downstream}")) class ShowUpstreamDependenciesCommand(SubCommand): diff --git a/python/etl/dbt.py b/python/etl/dbt.py new file mode 100644 index 00000000..18b6b213 --- /dev/null +++ b/python/etl/dbt.py @@ -0,0 +1,67 @@ +import os +import re +from collections import namedtuple +from typing import Sequence +import docker + +DbtModelIdentifier = namedtuple('DbtModelIdentifier', ['schema', 'table']) + + +class DBTProject: + + def __init__(self, dbt_project_root, dbt_profiles_dir): + self.dbt_root = dbt_project_root + self.dbt_profiles_dir = dbt_profiles_dir + self.local_dbt_path = 'dbt' + self.client = docker.from_env() + self.tag = 'arthur_dbt:latest' + + def build_image(self): + return self.client.api.build( + path='dbt', + tag=self.tag, + dockerfile='Dockerfile', + quiet=False, + # nocache=False + ) + + def run_cmd(self, cmd): + try: + return self.client.containers.run(self.tag, + cmd, + volumes={self.dbt_root: {'bind': '/dbt', 'mode': 'rw'}, + self.dbt_profiles_dir: {'bind': '/root/.dbt/profiles.yml', + 'mode': 'ro'}}, + stderr=True, + stdout=True, + ).decode("utf-8") + except docker.errors.ContainerError as exc: + print(exc.container.logs()) + raise + + @staticmethod + def get_files_in_path(path, file_types=None, prefix=""): + for root, dirs, files in os.walk(path): + for file in files: + if (not file_types or file.split('.')[-1] in file_types) and file.startswith(prefix): + yield (root, file) + + def show_downstream_dbt_parents(self, dbt_model_indentifiers: Sequence[DbtModelIdentifier]): + dbt_sql_files = self.get_files_in_path(self.local_dbt_path, file_types=('sql')) + db_source_regex = r"db_source\(\s*'(.*)'\s*,\s*'(.*)'\s*\)" + + for model_path, sql_file_path in dbt_sql_files: + # print(model_path, sql_file_path) + with open(os.path.join(model_path, sql_file_path), "r") as f: + sql_file = f.read() + db_sources = re.findall(db_source_regex, sql_file) + # print(db_sources) + for db_source in db_sources: + schema, table = db_source + for dmi in dbt_model_indentifiers: + if dmi.schema == schema and dmi.table == table: + yield sql_file_path.rstrip('.sql') + + + + diff --git a/python/etl/load.py b/python/etl/load.py index 1c9b30e9..9dbd214f 100644 --- a/python/etl/load.py +++ b/python/etl/load.py @@ -1426,7 +1426,7 @@ def show_downstream_dependents( continue_from: Optional[str] = None, with_dependencies: Optional[bool] = False, with_dependents: Optional[bool] = False, -) -> None: +) -> List[RelationDescription]: """ List the execution order of loads or updates. From a775623bd68f402491831abafef6662075e3ba95 Mon Sep 17 00:00:00 2001 From: ynaim94 Date: Fri, 27 May 2022 13:08:30 -0400 Subject: [PATCH 4/7] Render DBT list output --- Dockerfile | 2 + bin/docker_run.sh | 6 ++- python/etl/commands.py | 83 +++++++++++++++++++++------------------ python/etl/dbt.py | 89 ++++++++++++++++++++++++++++++------------ python/etl/load.py | 2 +- requirements.txt | 2 +- 6 files changed, 119 insertions(+), 65 deletions(-) diff --git a/Dockerfile b/Dockerfile index 51e30342..937a805b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -30,6 +30,8 @@ RUN yum install -y \ RUN amazon-linux-extras install docker +# TODO(youssef): We should use a non root user, but the containers need access the docker socket which is owned by root + RUN mkdir --parent /opt/data-warehouse "/opt/local/$PROJ_NAME" /opt/src/arthur-redshift-etl # The .bashrc will ensure the virutal environment is activated when running interactive shells. diff --git a/bin/docker_run.sh b/bin/docker_run.sh index d2ebca4c..d8d9e4f2 100755 --- a/bin/docker_run.sh +++ b/bin/docker_run.sh @@ -175,7 +175,7 @@ case "$action" in --volume ~/.ssh:/root/.ssh:ro \ --volume "$data_warehouse_path:/opt/data-warehouse" \ --volume "$(pwd):/opt/src/arthur-redshift-etl" \ - --volume /var/run/docker.sock:/var/run/docker.sock \ + --volume /var/run/docker.sock:/var/run/docker.sock:ro \ $publish_arg \ $profile_arg \ "arthur-redshift-etl:$tag" @@ -186,12 +186,16 @@ case "$action" in docker run --rm --interactive --tty \ --env ARTHUR_DEFAULT_PREFIX="$target_env" \ --env DATA_WAREHOUSE_CONFIG="/opt/data-warehouse/$config_path" \ + --env DBT_ROOT="$data_warehouse_path/dbt" \ + --env DBT_PROFILES_DIR="${HOME}/.dbt/profiles.yml" \ --sysctl net.ipv4.tcp_keepalive_time=300 \ --sysctl net.ipv4.tcp_keepalive_intvl=60 \ --sysctl net.ipv4.tcp_keepalive_probes=9 \ --volume ~/.aws:/root/.aws \ --volume ~/.ssh:/root/.ssh:ro \ --volume "$data_warehouse_path:/opt/data-warehouse" \ + --volume "$(pwd):/opt/src/arthur-redshift-etl" \ + --volume /var/run/docker.sock:/var/run/docker.sock:ro \ $publish_arg \ $profile_arg \ "arthur-redshift-etl:$tag" diff --git a/python/etl/commands.py b/python/etl/commands.py index 5eabc46c..1378e561 100644 --- a/python/etl/commands.py +++ b/python/etl/commands.py @@ -12,7 +12,6 @@ import shlex import sys import uuid -from collections import namedtuple from contextlib import contextmanager from datetime import datetime, timedelta, timezone from typing import Iterable, List, Optional @@ -43,7 +42,7 @@ import etl.templates import etl.unload import etl.validate -from etl.dbt import DBTProject, DbtModelIdentifier +from etl.dbt import DbtModelIdentifier, DBTProject from etl.errors import ETLError, ETLSystemError, InvalidArgumentError from etl.text import join_with_single_quotes from etl.util import croak, isoformat_datetime_string @@ -200,11 +199,11 @@ def submit_step(cluster_id, sub_command): "HadoopJarStep": { "Jar": "command-runner.jar", "Args": [ - etl.config.etl_tmp_dir("venv/bin/arthur.py"), - "--config", - etl.config.etl_tmp_dir("config"), - ] - + remaining, + etl.config.etl_tmp_dir("venv/bin/arthur.py"), + "--config", + etl.config.etl_tmp_dir("config"), + ] + + remaining, }, } ], @@ -440,7 +439,7 @@ def add_standard_arguments(parser: argparse.ArgumentParser, option_names: Iterab metavar="N", type=int, help="set max number of parallel loads to N (overrides " - "'resources.RedshiftCluster.max_concurrency')", + "'resources.RedshiftCluster.max_concurrency')", ) if "wlm-query-slots" in options: options.discard("wlm-query-slots") @@ -450,7 +449,7 @@ def add_standard_arguments(parser: argparse.ArgumentParser, option_names: Iterab metavar="N", type=int, help="set the number of Redshift WLM query slots used for transformations" - " (overrides 'resources.RedshiftCluster.wlm_query_slots')", + " (overrides 'resources.RedshiftCluster.wlm_query_slots')", ) if "statement-timeout" in options: options.discard("statement-timeout") @@ -460,8 +459,8 @@ def add_standard_arguments(parser: argparse.ArgumentParser, option_names: Iterab metavar="MILLISECS", type=int, help="set the timeout before canceling a statement in Redshift. This time includes planning," - " queueing in WLM, and execution time. (overrides " - "'resources.RedshiftCluster.statement_timeout')", + " queueing in WLM, and execution time. (overrides " + "'resources.RedshiftCluster.statement_timeout')", ) if "skip-copy" in options: options.discard("skip-copy") @@ -476,9 +475,9 @@ def add_standard_arguments(parser: argparse.ArgumentParser, option_names: Iterab parser.add_argument( "--continue-from", help="skip forward in execution until the specified relation, then work forward from it" - " (the special token '*' is allowed to signify continuing from the first relation," - " use ':transformations' as the argument to continue from the first transformation,)" - " otherwise specify an exact relation or source name)", + " (the special token '*' is allowed to signify continuing from the first relation," + " use ':transformations' as the argument to continue from the first transformation,)" + " otherwise specify an exact relation or source name)", ) if "pattern" in options: options.discard("pattern") @@ -515,7 +514,7 @@ class SubCommand(abc.ABC): uses_monitor = False def __init__( - self, name: str, help_: str, description: str, aliases: Optional[List[str]] = None + self, name: str, help_: str, description: str, aliases: Optional[List[str]] = None ) -> None: self.name = name self.help = help_ @@ -578,7 +577,7 @@ def location(args, default_scheme=None): raise ETLSystemError("scheme invalid") def find_relation_descriptions( - self, args, default_scheme=None, required_relation_selector=None, return_all=False + self, args, default_scheme=None, required_relation_selector=None, return_all=False ): """ Most commands need to collect file sets and create relation descriptions around those. @@ -638,7 +637,7 @@ def add_arguments(self, parser): "-f", "--force", help="destructively initialize the referenced database regardless" - " of whether it looks like a validation database", + " of whether it looks like a validation database", default=False, action="store_true", ) @@ -894,7 +893,7 @@ def add_arguments(self, parser): "type", choices=["CTAS", "VIEW", "update", "check-only"], help="pick whether to create table designs for 'CTAS' or 'VIEW' relations" - " , update the current relation, or check the current designs", + " , update the current relation, or check the current designs", ) # Note that patterns must follow the choice of CTAS, VIEW, update etc. add_standard_arguments(parser, ["pattern"]) @@ -1004,7 +1003,7 @@ def add_arguments(self, parser): const="manifest-only", dest="extractor", help="skip extraction and go straight to creating manifest files, " - "implied default for static sources", + "implied default for static sources", ) parser.add_argument( "-k", @@ -1078,15 +1077,15 @@ def add_arguments(self, parser): parser.add_argument( "--concurrent-extract", help="watch DynamoDB for extract step completion and load source tables as extracts" - " finish assuming another Arthur in this prefix is running extract" - " (default: %(default)s)", + " finish assuming another Arthur in this prefix is running extract" + " (default: %(default)s)", default=False, action="store_true", ) parser.add_argument( "--without-staging-schemas", help="do NOT do all the work in hidden schemas and publish to standard names on" - " completion (default: use staging schemas)", + " completion (default: use staging schemas)", default=True, action="store_false", dest="use_staging_schemas", @@ -1096,7 +1095,7 @@ def add_arguments(self, parser): default=True, action="store_false", help="Skip publishing staging schemas and keep result of the load step in" - " staging (default: publish schemas from staging)", + " staging (default: publish schemas from staging)", dest="publish_staging_schemas", ) parser.add_argument( @@ -1129,7 +1128,7 @@ def callback(self, args): ) statement_timeout = args.statement_timeout if statement_timeout is None and etl.config.is_config_set( - "resources.RedshiftCluster.statement_timeout" + "resources.RedshiftCluster.statement_timeout" ): statement_timeout = etl.config.get_config_int("resources.RedshiftCluster.statement_timeout") wlm_query_slots = args.wlm_query_slots or etl.config.get_config_int( @@ -1181,13 +1180,13 @@ def add_arguments(self, parser): action="store_true", default=False, help="skip rebuilding relations that depend on the selected ones" - " (leaves warehouse in inconsistent state, for debugging only)", + " (leaves warehouse in inconsistent state, for debugging only)", ) parser.add_argument( "--include-immediate-views", action="store_true", help="include views that are downstream of selected relations without any CTAS before" - " (this is the default and only useful with '--only-selected', for debugging only)", + " (this is the default and only useful with '--only-selected', for debugging only)", ) group = parser.add_mutually_exclusive_group() group.add_argument( @@ -1196,7 +1195,7 @@ def add_arguments(self, parser): default=False, dest="use_staging_schemas", help="do all the work using hidden schemas (default: do not use staging schemas," - " note this is the opposite of 'load' command)", + " note this is the opposite of 'load' command)", ) group.add_argument( "--into-schema", @@ -1226,7 +1225,7 @@ def callback(self, args): ) statement_timeout = args.statement_timeout if statement_timeout is None and etl.config.is_config_set( - "resources.RedshiftCluster.statement_timeout" + "resources.RedshiftCluster.statement_timeout" ): statement_timeout = etl.config.get_config_int("resources.RedshiftCluster.statement_timeout") wlm_query_slots = args.wlm_query_slots or etl.config.get_config_int( @@ -1266,7 +1265,7 @@ def add_arguments(self, parser): parser.add_argument( "--only-selected", help="only load data into selected relations" - " (leaves warehouse in inconsistent state, for debugging only, default: %(default)s)", + " (leaves warehouse in inconsistent state, for debugging only, default: %(default)s)", default=False, action="store_true", ) @@ -1276,7 +1275,7 @@ def add_arguments(self, parser): default=None, type=isoformat_datetime_string, help="require recent successful extract events for all selected source relations " - "after UTC time TIME (or, by default, don't require extract events)", + "after UTC time TIME (or, by default, don't require extract events)", ) parser.add_argument( "--vacuum", @@ -1289,7 +1288,7 @@ def callback(self, args): relations = self.find_relation_descriptions(args, default_scheme="s3", return_all=True) statement_timeout = args.statement_timeout if statement_timeout is None and etl.config.is_config_set( - "resources.RedshiftCluster.statement_timeout" + "resources.RedshiftCluster.statement_timeout" ): statement_timeout = etl.config.get_config_int("resources.RedshiftCluster.statement_timeout") wlm_query_slots = args.wlm_query_slots or etl.config.get_config_int( @@ -1786,13 +1785,21 @@ def callback(self, args): if not args.with_dbt: return - dbt_model_identifiers = [DbtModelIdentifier(*relation.identifier.split('.')) for relation in relations] - dbt_project = DBTProject(os.environ['DBT_ROOT'], os.environ['DBT_PROFILES_DIR']) - dbt_downstream = ' '.join([f"{parent}+" for parent in dbt_project.show_downstream_dbt_parents(dbt_model_identifiers)]) + dbt_model_identifiers = [ + DbtModelIdentifier(*relation.identifier.split(".")) for relation in relations + ] + dbt_project = DBTProject(os.environ["DBT_ROOT"], os.environ["DBT_PROFILES_DIR"]) + dbt_downstream_parents = " ".join( + [f"{parent}+" for parent in dbt_project.show_downstream_dbt_parents(dbt_model_identifiers)] + ) - ret = dbt_project.build_image() - print(dbt_project.run_cmd( - f"dbt list -t dev --exclude redshift --output name --resource-type model -s {dbt_downstream}")) + dbt_project.build_image() + dbt_stdout = dbt_project.run_cmd( + f"dbt list -t dev --exclude redshift --output json " + f" --resource-type model -s {dbt_downstream_parents}" + ) + dbt_relations = dbt_project.parse_dbt_run_stdout(dbt_stdout) + dbt_project.render_dbt_list(dbt_relations) class ShowUpstreamDependenciesCommand(SubCommand): @@ -1947,7 +1954,7 @@ def add_arguments(self, parser): action="append", choices=["step", "event", "elapsed", "rowcount"], help="select output column (in addition to target and timestamp)," - " use multiple times so add more columns", + " use multiple times so add more columns", ) parser.add_argument("etl_id", help="pick particular ETL from the past", nargs="?") diff --git a/python/etl/dbt.py b/python/etl/dbt.py index 18b6b213..c27c3c53 100644 --- a/python/etl/dbt.py +++ b/python/etl/dbt.py @@ -1,67 +1,108 @@ +import json +import logging import os import re +import time from collections import namedtuple from typing import Sequence + import docker -DbtModelIdentifier = namedtuple('DbtModelIdentifier', ['schema', 'table']) +logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) +DbtModelIdentifier = namedtuple("DbtModelIdentifier", ["schema", "table"]) + +DBTRelation = namedtuple("DBTRelation", ["name", "depends_on", "type", "is_required"]) -class DBTProject: +class DBTProject: def __init__(self, dbt_project_root, dbt_profiles_dir): self.dbt_root = dbt_project_root self.dbt_profiles_dir = dbt_profiles_dir - self.local_dbt_path = 'dbt' + self.local_dbt_path = "dbt" self.client = docker.from_env() - self.tag = 'arthur_dbt:latest' + self.tag = "arthur_dbt:latest" def build_image(self): - return self.client.api.build( - path='dbt', - tag=self.tag, - dockerfile='Dockerfile', - quiet=False, - # nocache=False + logging.info("Building DBT image") + img = self.client.api.build( + path="dbt", tag=self.tag, dockerfile="Dockerfile", quiet=False, nocache=False ) + time.sleep(5) # The image is not immediately available to pull + return img def run_cmd(self, cmd): try: - return self.client.containers.run(self.tag, - cmd, - volumes={self.dbt_root: {'bind': '/dbt', 'mode': 'rw'}, - self.dbt_profiles_dir: {'bind': '/root/.dbt/profiles.yml', - 'mode': 'ro'}}, - stderr=True, - stdout=True, - ).decode("utf-8") + logging.info(f"Executing inside dbt container {self.tag}: $ {cmd}") + return self.client.containers.run( + self.tag, + cmd, + volumes={ + self.dbt_root: {"bind": "/dbt", "mode": "rw"}, + self.dbt_profiles_dir: {"bind": "/root/.dbt/profiles.yml", "mode": "ro"}, + }, + stderr=True, + stdout=True, + auto_remove=True, + ).decode("utf-8") except docker.errors.ContainerError as exc: print(exc.container.logs()) raise @staticmethod def get_files_in_path(path, file_types=None, prefix=""): - for root, dirs, files in os.walk(path): + for root, _, files in os.walk(path): for file in files: - if (not file_types or file.split('.')[-1] in file_types) and file.startswith(prefix): + if (not file_types or file.split(".")[-1] in file_types) and file.startswith(prefix): yield (root, file) def show_downstream_dbt_parents(self, dbt_model_indentifiers: Sequence[DbtModelIdentifier]): - dbt_sql_files = self.get_files_in_path(self.local_dbt_path, file_types=('sql')) + dbt_sql_files = self.get_files_in_path(self.local_dbt_path, file_types=("sql")) db_source_regex = r"db_source\(\s*'(.*)'\s*,\s*'(.*)'\s*\)" for model_path, sql_file_path in dbt_sql_files: - # print(model_path, sql_file_path) with open(os.path.join(model_path, sql_file_path), "r") as f: sql_file = f.read() db_sources = re.findall(db_source_regex, sql_file) - # print(db_sources) for db_source in db_sources: schema, table = db_source for dmi in dbt_model_indentifiers: if dmi.schema == schema and dmi.table == table: - yield sql_file_path.rstrip('.sql') + yield os.path.splitext(sql_file_path)[0] + def parse_dbt_run_stdout(self, res): + res_list = res.strip().split("\n") + relations = [] + for e in res_list: + try: + d = json.loads(e) + except json.decoder.JSONDecodeError: + continue + d["depends_on"] = [node.split(".")[-1] for node in d["depends_on"]["nodes"]] + d["type"] = d["config"]["materialized"].upper() + d["is_required"] = "required" in d["config"]["tags"] + relations.append(DBTRelation(d["name"], d["depends_on"], d["type"], d["is_required"])) + return relations + def render_dbt_list(self, dbt_relations, with_dependencies=False, with_dependents=False): + current_index = {relation.name: i + 1 for i, relation in enumerate(dbt_relations)} + width_selected = max(len(name) for name in current_index) + line_template = ( + "{relation.name:{width}s}" + " # {relation.type} index={index:4d}" + " flag={flag:9s}" + " is_required={relation.is_required}" + ) + for relation in dbt_relations: + print( + line_template.format( + flag="DBT", + index=current_index[relation.name], + relation=relation, + width=width_selected, + ) + ) + return diff --git a/python/etl/load.py b/python/etl/load.py index 9dbd214f..fce6edef 100644 --- a/python/etl/load.py +++ b/python/etl/load.py @@ -1441,7 +1441,7 @@ def show_downstream_dependents( relations, selector, include_dependents=True, continue_from=continue_from ) if not selected_relations: - return + return selected_relations directly_selected_relations = etl.relation.find_matches(selected_relations, selector) selected = frozenset(relation.identifier for relation in directly_selected_relations) diff --git a/requirements.txt b/requirements.txt index 0f16898c..ed4b7f61 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,7 @@ arrow==1.2.2 boto3==1.21.42 botocore~=1.24 +docker==5.0.3 funcy==1.17 jmespath==0.10.0 jsonschema==3.2.0 @@ -13,4 +14,3 @@ tabulate==0.8.9 termcolor==1.1.0 tqdm==4.64.0 watchtower==3.0.0 -docker From 9417766b04a20ebfe2eef206b1ad4afdbb6bbbb9 Mon Sep 17 00:00:00 2001 From: ynaim94 Date: Mon, 6 Jun 2022 17:00:28 -0400 Subject: [PATCH 5/7] Use Arthur User --- Dockerfile | 33 ++++++++++++++++++++++----------- bin/arthur.sh | 4 ++-- bin/docker_run.sh | 14 +++++++------- docker/entrypoint.sh | 2 ++ python/etl/dbt.py | 2 +- 5 files changed, 34 insertions(+), 21 deletions(-) diff --git a/Dockerfile b/Dockerfile index 937a805b..3eae208b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -26,19 +26,29 @@ RUN yum install -y \ python3-devel \ tar \ tmux \ - vim-minimal + vim-minimal \ + sudo \ + passwd RUN amazon-linux-extras install docker -# TODO(youssef): We should use a non root user, but the containers need access the docker socket which is owned by root +# Run as non-priviledged user "arthur". +RUN useradd --comment 'Arthur ETL' --user-group --create-home arthur && \ + passwd -d arthur && \ + mkdir --parent /opt/data-warehouse "/opt/local/$PROJ_NAME" /opt/src/arthur-redshift-etl && \ + chown -R arthur.arthur /opt/* -RUN mkdir --parent /opt/data-warehouse "/opt/local/$PROJ_NAME" /opt/src/arthur-redshift-etl +RUN echo "arthur:arthur" | chpasswd +RUN usermod -aG wheel arthur + +USER arthur # The .bashrc will ensure the virutal environment is activated when running interactive shells. -COPY docker/* /root +COPY --chown=arthur:arthur docker/* /home/arthur/ # Install code under /opt/local/ (although it is under /tmp/ on an EC2 host). -COPY bin/create_validation_credentials \ +COPY --chown=arthur:arthur \ + bin/create_validation_credentials \ bin/release_version.sh \ bin/send_health_check.sh \ bin/sync_env.sh \ @@ -52,13 +62,13 @@ RUN python3 -m venv "/opt/local/$PROJ_NAME/venv" && \ python3 -m pip install --requirement /tmp/requirements-all.txt --disable-pip-version-check --no-cache-dir # Create an empty .pgpass file to help with the format of this file. -RUN echo '# Format to set password when updating users: *:5439:*::' > /root/.pgpass \ - && chmod go= /root/.pgpass +RUN echo '# Format to set password when updating users: *:5439:*::' > /home/arthur/.pgpass \ + && chmod go= /home/arthur/.pgpass # Note that at runtime we (can or may) mount the local directory here. # But we want to be independent of the source so copy everything over once. WORKDIR /opt/src/arthur-redshift-etl -COPY ./ ./ +COPY --chown=arthur:arthur ./ ./ # We run this here once in case somebody overrides the entrypoint. RUN source "/opt/local/$PROJ_NAME/venv/bin/activate" && \ @@ -73,14 +83,15 @@ EXPOSE 8086 # The data warehouse (with schemas, config, etc.) will be mounted here: WORKDIR /opt/data-warehouse -ENTRYPOINT ["/root/entrypoint.sh"] +ENTRYPOINT ["/home/arthur/entrypoint.sh"] CMD ["/bin/bash", "--login"] # Second stage, overriding entrypoint in the image to not have to override it everytime # we use the Arthur image in a remote environment. entrypoint_remote.sh will fetch config # and schema files from S3 FROM local AS remote -COPY bin/bootstrap_remote_dw.sh \ +COPY --chown=arthur:arthur \ + bin/bootstrap_remote_dw.sh \ "/opt/local/$PROJ_NAME/bin/" ENV PROJ_NAME=$PROJ_NAME -ENTRYPOINT ["/root/entrypoint_remote.sh"] +ENTRYPOINT ["/home/arthur/entrypoint_remote.sh"] diff --git a/bin/arthur.sh b/bin/arthur.sh index 9a3549fe..74d0466a 100755 --- a/bin/arthur.sh +++ b/bin/arthur.sh @@ -136,7 +136,7 @@ docker run --rm --interactive --tty \ --sysctl net.ipv4.tcp_keepalive_time=300 \ --sysctl net.ipv4.tcp_keepalive_intvl=60 \ --sysctl net.ipv4.tcp_keepalive_probes=9 \ - --volume ~/.aws:/root/.aws \ - --volume ~/.ssh:/root/.ssh:ro \ + --volume ~/.aws:/home/arthur/.aws \ + --volume ~/.ssh:/home/arthur/.ssh:ro \ --volume "$data_warehouse_path:/opt/data-warehouse" \ "$docker_image:$tag" diff --git a/bin/docker_run.sh b/bin/docker_run.sh index d8d9e4f2..ea1ec472 100755 --- a/bin/docker_run.sh +++ b/bin/docker_run.sh @@ -154,7 +154,7 @@ case "$action" in docker run --rm --tty \ --env ARTHUR_DEFAULT_PREFIX="$target_env" \ --env DATA_WAREHOUSE_CONFIG="/opt/data-warehouse/$config_path" \ - --volume ~/.aws:/root/.aws \ + --volume ~/.aws:/home/arthur/.aws \ --volume "$data_warehouse_path":/opt/data-warehouse \ $profile_arg \ "arthur-redshift-etl:$tag" \ @@ -171,8 +171,8 @@ case "$action" in --sysctl net.ipv4.tcp_keepalive_time=300 \ --sysctl net.ipv4.tcp_keepalive_intvl=60 \ --sysctl net.ipv4.tcp_keepalive_probes=9 \ - --volume ~/.aws:/root/.aws \ - --volume ~/.ssh:/root/.ssh:ro \ + --volume ~/.aws:/home/arthur/.aws \ + --volume ~/.ssh:/home/arthur/.ssh:ro \ --volume "$data_warehouse_path:/opt/data-warehouse" \ --volume "$(pwd):/opt/src/arthur-redshift-etl" \ --volume /var/run/docker.sock:/var/run/docker.sock:ro \ @@ -191,8 +191,8 @@ case "$action" in --sysctl net.ipv4.tcp_keepalive_time=300 \ --sysctl net.ipv4.tcp_keepalive_intvl=60 \ --sysctl net.ipv4.tcp_keepalive_probes=9 \ - --volume ~/.aws:/root/.aws \ - --volume ~/.ssh:/root/.ssh:ro \ + --volume ~/.aws:/home/arthur/.aws \ + --volume ~/.ssh:/home/arthur/.ssh:ro \ --volume "$data_warehouse_path:/opt/data-warehouse" \ --volume "$(pwd):/opt/src/arthur-redshift-etl" \ --volume /var/run/docker.sock:/var/run/docker.sock:ro \ @@ -206,7 +206,7 @@ case "$action" in docker run --rm --tty \ --env ARTHUR_DEFAULT_PREFIX="$target_env" \ --env DATA_WAREHOUSE_CONFIG="/opt/data-warehouse/$config_path" \ - --volume ~/.aws:/root/.aws \ + --volume ~/.aws:/home/arthur/.aws \ --volume "$data_warehouse_path:/opt/data-warehouse" \ $profile_arg \ "arthur-redshift-etl:$tag" \ @@ -218,7 +218,7 @@ case "$action" in docker run --rm --tty \ --env ARTHUR_DEFAULT_PREFIX="$target_env" \ --env DATA_WAREHOUSE_CONFIG="/opt/data-warehouse/$config_path" \ - --volume ~/.aws:/root/.aws \ + --volume ~/.aws:/home/arthur/.aws \ --volume "$data_warehouse_path":/opt/data-warehouse \ $profile_arg \ "arthur-redshift-etl:$tag" \ diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh index 87d843b4..9f447c1c 100755 --- a/docker/entrypoint.sh +++ b/docker/entrypoint.sh @@ -31,4 +31,6 @@ if [[ -r "/opt/local/redshift_etl/venv/bin/activate" ]]; then fi fi +echo arthur >> sudo chmod 777 /var/run/docker.sock + exec "$@" diff --git a/python/etl/dbt.py b/python/etl/dbt.py index c27c3c53..0db6b7c8 100644 --- a/python/etl/dbt.py +++ b/python/etl/dbt.py @@ -40,7 +40,7 @@ def run_cmd(self, cmd): cmd, volumes={ self.dbt_root: {"bind": "/dbt", "mode": "rw"}, - self.dbt_profiles_dir: {"bind": "/root/.dbt/profiles.yml", "mode": "ro"}, + self.dbt_profiles_dir: {"bind": "/home/arthur/.dbt/profiles.yml", "mode": "ro"}, }, stderr=True, stdout=True, From 85a8d875969054ede2dcbf8799fdb9a1f3f93966 Mon Sep 17 00:00:00 2001 From: ynaim94 Date: Tue, 7 Jun 2022 10:44:48 -0400 Subject: [PATCH 6/7] Wait for build to finish before running container --- Dockerfile | 1 - bin/docker_run.sh | 1 - python/etl/commands.py | 4 ++-- python/etl/dbt.py | 11 ++++++----- requirements-docs.txt | 2 +- 5 files changed, 9 insertions(+), 10 deletions(-) diff --git a/Dockerfile b/Dockerfile index 3eae208b..0e173016 100644 --- a/Dockerfile +++ b/Dockerfile @@ -34,7 +34,6 @@ RUN amazon-linux-extras install docker # Run as non-priviledged user "arthur". RUN useradd --comment 'Arthur ETL' --user-group --create-home arthur && \ - passwd -d arthur && \ mkdir --parent /opt/data-warehouse "/opt/local/$PROJ_NAME" /opt/src/arthur-redshift-etl && \ chown -R arthur.arthur /opt/* diff --git a/bin/docker_run.sh b/bin/docker_run.sh index ea1ec472..cff92058 100755 --- a/bin/docker_run.sh +++ b/bin/docker_run.sh @@ -194,7 +194,6 @@ case "$action" in --volume ~/.aws:/home/arthur/.aws \ --volume ~/.ssh:/home/arthur/.ssh:ro \ --volume "$data_warehouse_path:/opt/data-warehouse" \ - --volume "$(pwd):/opt/src/arthur-redshift-etl" \ --volume /var/run/docker.sock:/var/run/docker.sock:ro \ $publish_arg \ $profile_arg \ diff --git a/python/etl/commands.py b/python/etl/commands.py index 1378e561..bd49c45c 100644 --- a/python/etl/commands.py +++ b/python/etl/commands.py @@ -1764,7 +1764,7 @@ def add_arguments(self, parser): help="show list of dependents (upstream) for every relation", ) group.add_argument( - "--with-dbt", + "--include-dbt", action="store_true", help="show list of dependents (upstream) for every relation", ) @@ -1782,7 +1782,7 @@ def callback(self, args): with_dependents=args.with_dependents, ) - if not args.with_dbt: + if not args.include_dbt: return dbt_model_identifiers = [ diff --git a/python/etl/dbt.py b/python/etl/dbt.py index 0db6b7c8..41796c69 100644 --- a/python/etl/dbt.py +++ b/python/etl/dbt.py @@ -2,7 +2,6 @@ import logging import os import re -import time from collections import namedtuple from typing import Sequence @@ -29,8 +28,11 @@ def build_image(self): img = self.client.api.build( path="dbt", tag=self.tag, dockerfile="Dockerfile", quiet=False, nocache=False ) - time.sleep(5) # The image is not immediately available to pull - return img + + # Wait for the image to build + for _ in img: + continue + logging.info(f"{self.tag} docker image built") def run_cmd(self, cmd): try: @@ -40,11 +42,10 @@ def run_cmd(self, cmd): cmd, volumes={ self.dbt_root: {"bind": "/dbt", "mode": "rw"}, - self.dbt_profiles_dir: {"bind": "/home/arthur/.dbt/profiles.yml", "mode": "ro"}, + self.dbt_profiles_dir: {"bind": "/root/.dbt/profiles.yml", "mode": "ro"}, }, stderr=True, stdout=True, - auto_remove=True, ).decode("utf-8") except docker.errors.ContainerError as exc: print(exc.container.logs()) diff --git a/requirements-docs.txt b/requirements-docs.txt index 0dcee27c..09d4311d 100644 --- a/requirements-docs.txt +++ b/requirements-docs.txt @@ -1,4 +1,4 @@ myst-parser~=0.16 -Sphinx~=4.5 +Sphinx==4.3.2 sphinx-autodoc-typehints~=1.12 sphinx-book-theme~=0.2 From 00d1be409ea83c03f2097856f0a51f2cd1d91466 Mon Sep 17 00:00:00 2001 From: ynaim94 Date: Wed, 8 Jun 2022 16:28:04 -0400 Subject: [PATCH 7/7] Add logging and abort when no dbt downstream tables are found --- Dockerfile | 9 ++++----- python/etl/commands.py | 4 ++++ 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/Dockerfile b/Dockerfile index 0e173016..b1148f56 100644 --- a/Dockerfile +++ b/Dockerfile @@ -20,25 +20,24 @@ RUN yum install -y \ jq \ libyaml-devel \ openssh-clients \ + passwd \ postgresql \ procps-ng \ python3 \ python3-devel \ + sudo \ tar \ tmux \ - vim-minimal \ - sudo \ - passwd + vim-minimal RUN amazon-linux-extras install docker # Run as non-priviledged user "arthur". -RUN useradd --comment 'Arthur ETL' --user-group --create-home arthur && \ +RUN useradd --comment 'Arthur ETL' --user-group --groups wheel --create-home arthur && \ mkdir --parent /opt/data-warehouse "/opt/local/$PROJ_NAME" /opt/src/arthur-redshift-etl && \ chown -R arthur.arthur /opt/* RUN echo "arthur:arthur" | chpasswd -RUN usermod -aG wheel arthur USER arthur diff --git a/python/etl/commands.py b/python/etl/commands.py index bd49c45c..75be649d 100644 --- a/python/etl/commands.py +++ b/python/etl/commands.py @@ -1792,7 +1792,11 @@ def callback(self, args): dbt_downstream_parents = " ".join( [f"{parent}+" for parent in dbt_project.show_downstream_dbt_parents(dbt_model_identifiers)] ) + if not dbt_downstream_parents: + logging.info("No dbt downstream dependents found") + return + logging.info("dbt downstream dependents found") dbt_project.build_image() dbt_stdout = dbt_project.run_cmd( f"dbt list -t dev --exclude redshift --output json "