From 4b0276e03f134c372b4bcd0f751acac3839e40c8 Mon Sep 17 00:00:00 2001 From: Anthony Burdi Date: Fri, 16 Sep 2022 18:28:33 -0400 Subject: [PATCH] [FEATURE] context.save_datasource (#6009) --- .../data_context/abstract_data_context.py | 38 +++ .../test_datasource_crud.py | 241 +++++++++++++++++- 2 files changed, 276 insertions(+), 3 deletions(-) diff --git a/great_expectations/data_context/data_context/abstract_data_context.py b/great_expectations/data_context/data_context/abstract_data_context.py index c6ae2dfea9b1..24411dbef7b7 100644 --- a/great_expectations/data_context/data_context/abstract_data_context.py +++ b/great_expectations/data_context/data_context/abstract_data_context.py @@ -461,6 +461,44 @@ def set_config(self, project_config: DataContextConfig) -> None: self._project_config = project_config self.variables.config = project_config + def save_datasource( + self, datasource: Union[LegacyDatasource, BaseDatasource] + ) -> Union[LegacyDatasource, BaseDatasource]: + """Save a Datasource to the configured DatasourceStore. + + Stores the underlying DatasourceConfig in the store and Data Context config, + updates the cached Datasource and returns the Datasource. + The cached and returned Datasource is re-constructed from the config + that was stored as some store implementations make edits to the stored + config (e.g. adding identifiers). + + Args: + datasource: Datasource to store. + + Returns: + The datasource, after storing and retrieving the stored config. + """ + + datasource_config_dict: dict = datasourceConfigSchema.dump(datasource.config) + datasource_config = DatasourceConfig(**datasource_config_dict) + datasource_name: str = datasource.name + + updated_datasource_config_from_store: DatasourceConfig = self._datasource_store.set( # type: ignore[attr-defined] + key=None, value=datasource_config + ) + # Use the updated datasource config, since the store may populate additional info on update. + self.config.datasources[datasource_name] = updated_datasource_config_from_store # type: ignore[index,assignment] + + # Also use the updated config to initialize a datasource for the cache and overwrite the existing datasource. + substituted_config = self._perform_substitutions_on_datasource_config( + updated_datasource_config_from_store + ) + updated_datasource: Union[ + LegacyDatasource, BaseDatasource + ] = self._instantiate_datasource_from_config(config=substituted_config) + self._cached_datasources[datasource_name] = updated_datasource + return updated_datasource + def add_datasource( self, name: str, diff --git a/tests/data_context/cloud_data_context/test_datasource_crud.py b/tests/data_context/cloud_data_context/test_datasource_crud.py index 43a06a6c4d57..c72825f08263 100644 --- a/tests/data_context/cloud_data_context/test_datasource_crud.py +++ b/tests/data_context/cloud_data_context/test_datasource_crud.py @@ -1,22 +1,33 @@ """This file is meant for integration tests related to datasource CRUD.""" import copy -from typing import Callable +from typing import TYPE_CHECKING, Callable, Tuple, Type, Union from unittest.mock import patch import pytest from great_expectations import DataContext -from great_expectations.data_context import BaseDataContext, CloudDataContext +from great_expectations.core.serializer import ( + AbstractConfigSerializer, + DictConfigSerializer, +) +from great_expectations.data_context import ( + AbstractDataContext, + BaseDataContext, + CloudDataContext, +) from great_expectations.data_context.types.base import ( DatasourceConfig, datasourceConfigSchema, ) -from great_expectations.datasource import BaseDatasource +from great_expectations.datasource import BaseDatasource, Datasource, LegacyDatasource from great_expectations.datasource.datasource_serializer import ( JsonDatasourceConfigSerializer, ) from tests.data_context.conftest import MockResponse +if TYPE_CHECKING: + from _pytest.fixtures import FixtureRequest + @pytest.mark.cloud @pytest.mark.integration @@ -358,3 +369,227 @@ def test_cloud_data_context_add_datasource( assert stored_datasource.name == datasource_name assert stored_datasource.config["name"] == datasource_name assert stored_data_connector.name == data_connector_name + + +def _save_datasource_assertions( + context: AbstractDataContext, + datasource_to_save_config: DatasourceConfig, + datasource_to_save: Datasource, + saved_datasource: Union[LegacyDatasource, BaseDatasource], + attributes_to_verify: Tuple[str, ...] = ( + "name", + "execution_engine", + "data_connectors", + ), +): + datasource_name: str = datasource_to_save.name + # Make sure the datasource config got into the context config + assert len(context.config.datasources) == 1 + assert context.config.datasources[datasource_name] == datasource_to_save_config + + # Make sure the datasource got into the cache + assert len(context._cached_datasources) == 1 + cached_datasource = context._cached_datasources[datasource_name] + + # Make sure the stored and returned datasource is the same one as the cached datasource + assert id(saved_datasource) == id(cached_datasource) + assert saved_datasource == cached_datasource + + # Make sure the stored and returned datasource are otherwise equal + serializer: AbstractConfigSerializer = DictConfigSerializer( + schema=datasourceConfigSchema + ) + saved_datasource_dict = serializer.serialize( + datasourceConfigSchema.load(saved_datasource.config) + ) + datasource_to_save_dict = serializer.serialize( + datasourceConfigSchema.load(datasource_to_save.config) + ) + + for attribute in attributes_to_verify: + assert saved_datasource_dict[attribute] == datasource_to_save_dict[attribute] + + +@pytest.mark.unit +def test_non_cloud_backed_data_context_save_datasource_empty_store( + empty_data_context: DataContext, + datasource_config_with_names: DatasourceConfig, +): + """What does this test and why? + + This tests that context.save_datasource() does store config in the context + config and in the cache, and also returns the datasource. + """ + context: DataContext = empty_data_context + # Make sure the fixture has the right configuration + assert len(context.list_datasources()) == 0 + + datasource_to_save: Datasource = context._build_datasource_from_config( + datasource_config_with_names + ) + + with patch( + "great_expectations.data_context.store.datasource_store.DatasourceStore.set", + autospec=True, + return_value=datasource_config_with_names, + ): + saved_datasource: Union[ + LegacyDatasource, BaseDatasource + ] = context.save_datasource(datasource_to_save) + + _save_datasource_assertions( + context=context, + datasource_to_save_config=datasource_config_with_names, + datasource_to_save=datasource_to_save, + saved_datasource=saved_datasource, + ) + + +@pytest.mark.unit +def test_non_cloud_backed_data_context_save_datasource_overwrite_existing( + empty_data_context: DataContext, + datasource_config_with_names: DatasourceConfig, +): + """What does this test and why? + This ensures there are no checks that stop an overwrite/update of an + existing datasource in context.save_datasource(). It does not test the + underlying store or store backend.""" + context: DataContext = empty_data_context + # Make sure the fixture has the right configuration + assert len(context.list_datasources()) == 0 + + # 1. Add datasource to empty context + serializer: AbstractConfigSerializer = DictConfigSerializer( + schema=datasourceConfigSchema + ) + datasource_config_with_names_dict = serializer.serialize( + datasource_config_with_names + ) + context.add_datasource(**datasource_config_with_names_dict) + assert len(context.list_datasources()) == 1 + + # 2. Create new datasource (slightly different, but same name) and call context.save_datasource() + datasource_config_with_names_modified = copy.deepcopy(datasource_config_with_names) + data_connector_name: str = tuple( + datasource_config_with_names_modified.data_connectors.keys() + )[0] + data_connector_config = copy.deepcopy( + datasource_config_with_names_modified.data_connectors[data_connector_name] + ) + datasource_config_with_names_modified.data_connectors.pop(data_connector_name, None) + + # Rename the data connector + new_data_connector_name: str = "new_data_connector_name" + data_connector_config["name"] = new_data_connector_name + datasource_config_with_names_modified.data_connectors[ + new_data_connector_name + ] = data_connector_config + + new_datasource: Datasource = context._build_datasource_from_config( + datasource_config_with_names_modified + ) + + orig_datasource_name: str = datasource_config_with_names.name + datasource_name: str = new_datasource.name + assert orig_datasource_name == datasource_name + + pre_update_datasource = context.get_datasource(datasource_name) + assert tuple(pre_update_datasource.data_connectors.keys())[0] == data_connector_name + + # 3. Make sure no exceptions are raised when saving. + with patch( + "great_expectations.data_context.store.datasource_store.DatasourceStore.set", + autospec=True, + return_value=datasource_config_with_names_modified, + ): + + saved_datasource: Union[ + LegacyDatasource, BaseDatasource + ] = context.save_datasource(new_datasource) + + _save_datasource_assertions( + context=context, + datasource_to_save_config=datasource_config_with_names_modified, + datasource_to_save=new_datasource, + saved_datasource=saved_datasource, + ) + + # Make sure the name was updated + updated_datasource_data_connector_name: str = tuple( + saved_datasource.data_connectors.keys() + )[0] + assert updated_datasource_data_connector_name == new_data_connector_name + + +@pytest.mark.cloud +@pytest.mark.unit +def test_cloud_data_context_save_datasource_empty_store( + empty_cloud_data_context: CloudDataContext, + datasource_config_with_names: DatasourceConfig, + datasource_config_with_names_and_ids: DatasourceConfig, +): + """What does this test and why? + Any Data Context in cloud mode should save to the cloud backed Datasource + store when calling save_datasource. When saving, it should use the id from + the response to create the datasource, and update both the + config and cache.""" + + context: CloudDataContext = empty_cloud_data_context + + # Make sure the fixture has the right configuration + assert len(context.list_datasources()) == 0 + + datasource_to_save: Datasource = context._build_datasource_from_config( + datasource_config_with_names + ) + data_connector_name: str = tuple(datasource_to_save.data_connectors.keys())[0] + + with patch( + "great_expectations.data_context.store.datasource_store.DatasourceStore.set", + autospec=True, + return_value=datasource_config_with_names_and_ids, + ): + saved_datasource: Union[ + LegacyDatasource, BaseDatasource + ] = context.save_datasource(datasource_to_save) + + _save_datasource_assertions( + context=context, + datasource_to_save_config=datasource_config_with_names_and_ids, + datasource_to_save=datasource_to_save, + saved_datasource=saved_datasource, + attributes_to_verify=( + "name", + "execution_engine", + ), + ) + + serializer: AbstractConfigSerializer = DictConfigSerializer( + schema=datasourceConfigSchema + ) + saved_datasource_dict = serializer.serialize( + datasourceConfigSchema.load(saved_datasource.config) + ) + orig_datasource_dict = serializer.serialize( + datasourceConfigSchema.load(datasource_to_save.config) + ) + + updated_datasource_dict_no_datasource_id = copy.deepcopy(saved_datasource_dict) + updated_datasource_dict_no_datasource_id["data_connectors"][ + data_connector_name + ].pop("id", None) + assert ( + updated_datasource_dict_no_datasource_id["data_connectors"] + == orig_datasource_dict["data_connectors"] + ) + + # Make sure that the id is populated only in the updated and cached datasource + assert datasource_to_save.id is None + assert datasource_to_save.data_connectors[data_connector_name].id is None + assert saved_datasource.id == datasource_config_with_names_and_ids.id + assert ( + saved_datasource.data_connectors[data_connector_name].id + == datasource_config_with_names_and_ids.data_connectors[ + data_connector_name + ]["id"] + )