Skip to content

Commit

Permalink
[FEATURE] context.save_datasource (#6009)
Browse files Browse the repository at this point in the history
  • Loading branch information
anthonyburdi authored Sep 16, 2022
1 parent 4449709 commit 4b0276e
Show file tree
Hide file tree
Showing 2 changed files with 276 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,44 @@ def set_config(self, project_config: DataContextConfig) -> None:
self._project_config = project_config
self.variables.config = project_config

def save_datasource(
self, datasource: Union[LegacyDatasource, BaseDatasource]
) -> Union[LegacyDatasource, BaseDatasource]:
"""Save a Datasource to the configured DatasourceStore.
Stores the underlying DatasourceConfig in the store and Data Context config,
updates the cached Datasource and returns the Datasource.
The cached and returned Datasource is re-constructed from the config
that was stored as some store implementations make edits to the stored
config (e.g. adding identifiers).
Args:
datasource: Datasource to store.
Returns:
The datasource, after storing and retrieving the stored config.
"""

datasource_config_dict: dict = datasourceConfigSchema.dump(datasource.config)
datasource_config = DatasourceConfig(**datasource_config_dict)
datasource_name: str = datasource.name

updated_datasource_config_from_store: DatasourceConfig = self._datasource_store.set( # type: ignore[attr-defined]
key=None, value=datasource_config
)
# Use the updated datasource config, since the store may populate additional info on update.
self.config.datasources[datasource_name] = updated_datasource_config_from_store # type: ignore[index,assignment]

# Also use the updated config to initialize a datasource for the cache and overwrite the existing datasource.
substituted_config = self._perform_substitutions_on_datasource_config(
updated_datasource_config_from_store
)
updated_datasource: Union[
LegacyDatasource, BaseDatasource
] = self._instantiate_datasource_from_config(config=substituted_config)
self._cached_datasources[datasource_name] = updated_datasource
return updated_datasource

def add_datasource(
self,
name: str,
Expand Down
241 changes: 238 additions & 3 deletions tests/data_context/cloud_data_context/test_datasource_crud.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,33 @@
"""This file is meant for integration tests related to datasource CRUD."""
import copy
from typing import Callable
from typing import TYPE_CHECKING, Callable, Tuple, Type, Union
from unittest.mock import patch

import pytest

from great_expectations import DataContext
from great_expectations.data_context import BaseDataContext, CloudDataContext
from great_expectations.core.serializer import (
AbstractConfigSerializer,
DictConfigSerializer,
)
from great_expectations.data_context import (
AbstractDataContext,
BaseDataContext,
CloudDataContext,
)
from great_expectations.data_context.types.base import (
DatasourceConfig,
datasourceConfigSchema,
)
from great_expectations.datasource import BaseDatasource
from great_expectations.datasource import BaseDatasource, Datasource, LegacyDatasource
from great_expectations.datasource.datasource_serializer import (
JsonDatasourceConfigSerializer,
)
from tests.data_context.conftest import MockResponse

if TYPE_CHECKING:
from _pytest.fixtures import FixtureRequest


@pytest.mark.cloud
@pytest.mark.integration
Expand Down Expand Up @@ -358,3 +369,227 @@ def test_cloud_data_context_add_datasource(
assert stored_datasource.name == datasource_name
assert stored_datasource.config["name"] == datasource_name
assert stored_data_connector.name == data_connector_name


def _save_datasource_assertions(
context: AbstractDataContext,
datasource_to_save_config: DatasourceConfig,
datasource_to_save: Datasource,
saved_datasource: Union[LegacyDatasource, BaseDatasource],
attributes_to_verify: Tuple[str, ...] = (
"name",
"execution_engine",
"data_connectors",
),
):
datasource_name: str = datasource_to_save.name
# Make sure the datasource config got into the context config
assert len(context.config.datasources) == 1
assert context.config.datasources[datasource_name] == datasource_to_save_config

# Make sure the datasource got into the cache
assert len(context._cached_datasources) == 1
cached_datasource = context._cached_datasources[datasource_name]

# Make sure the stored and returned datasource is the same one as the cached datasource
assert id(saved_datasource) == id(cached_datasource)
assert saved_datasource == cached_datasource

# Make sure the stored and returned datasource are otherwise equal
serializer: AbstractConfigSerializer = DictConfigSerializer(
schema=datasourceConfigSchema
)
saved_datasource_dict = serializer.serialize(
datasourceConfigSchema.load(saved_datasource.config)
)
datasource_to_save_dict = serializer.serialize(
datasourceConfigSchema.load(datasource_to_save.config)
)

for attribute in attributes_to_verify:
assert saved_datasource_dict[attribute] == datasource_to_save_dict[attribute]


@pytest.mark.unit
def test_non_cloud_backed_data_context_save_datasource_empty_store(
empty_data_context: DataContext,
datasource_config_with_names: DatasourceConfig,
):
"""What does this test and why?
This tests that context.save_datasource() does store config in the context
config and in the cache, and also returns the datasource.
"""
context: DataContext = empty_data_context
# Make sure the fixture has the right configuration
assert len(context.list_datasources()) == 0

datasource_to_save: Datasource = context._build_datasource_from_config(
datasource_config_with_names
)

with patch(
"great_expectations.data_context.store.datasource_store.DatasourceStore.set",
autospec=True,
return_value=datasource_config_with_names,
):
saved_datasource: Union[
LegacyDatasource, BaseDatasource
] = context.save_datasource(datasource_to_save)

_save_datasource_assertions(
context=context,
datasource_to_save_config=datasource_config_with_names,
datasource_to_save=datasource_to_save,
saved_datasource=saved_datasource,
)


@pytest.mark.unit
def test_non_cloud_backed_data_context_save_datasource_overwrite_existing(
empty_data_context: DataContext,
datasource_config_with_names: DatasourceConfig,
):
"""What does this test and why?
This ensures there are no checks that stop an overwrite/update of an
existing datasource in context.save_datasource(). It does not test the
underlying store or store backend."""
context: DataContext = empty_data_context
# Make sure the fixture has the right configuration
assert len(context.list_datasources()) == 0

# 1. Add datasource to empty context
serializer: AbstractConfigSerializer = DictConfigSerializer(
schema=datasourceConfigSchema
)
datasource_config_with_names_dict = serializer.serialize(
datasource_config_with_names
)
context.add_datasource(**datasource_config_with_names_dict)
assert len(context.list_datasources()) == 1

# 2. Create new datasource (slightly different, but same name) and call context.save_datasource()
datasource_config_with_names_modified = copy.deepcopy(datasource_config_with_names)
data_connector_name: str = tuple(
datasource_config_with_names_modified.data_connectors.keys()
)[0]
data_connector_config = copy.deepcopy(
datasource_config_with_names_modified.data_connectors[data_connector_name]
)
datasource_config_with_names_modified.data_connectors.pop(data_connector_name, None)

# Rename the data connector
new_data_connector_name: str = "new_data_connector_name"
data_connector_config["name"] = new_data_connector_name
datasource_config_with_names_modified.data_connectors[
new_data_connector_name
] = data_connector_config

new_datasource: Datasource = context._build_datasource_from_config(
datasource_config_with_names_modified
)

orig_datasource_name: str = datasource_config_with_names.name
datasource_name: str = new_datasource.name
assert orig_datasource_name == datasource_name

pre_update_datasource = context.get_datasource(datasource_name)
assert tuple(pre_update_datasource.data_connectors.keys())[0] == data_connector_name

# 3. Make sure no exceptions are raised when saving.
with patch(
"great_expectations.data_context.store.datasource_store.DatasourceStore.set",
autospec=True,
return_value=datasource_config_with_names_modified,
):

saved_datasource: Union[
LegacyDatasource, BaseDatasource
] = context.save_datasource(new_datasource)

_save_datasource_assertions(
context=context,
datasource_to_save_config=datasource_config_with_names_modified,
datasource_to_save=new_datasource,
saved_datasource=saved_datasource,
)

# Make sure the name was updated
updated_datasource_data_connector_name: str = tuple(
saved_datasource.data_connectors.keys()
)[0]
assert updated_datasource_data_connector_name == new_data_connector_name


@pytest.mark.cloud
@pytest.mark.unit
def test_cloud_data_context_save_datasource_empty_store(
empty_cloud_data_context: CloudDataContext,
datasource_config_with_names: DatasourceConfig,
datasource_config_with_names_and_ids: DatasourceConfig,
):
"""What does this test and why?
Any Data Context in cloud mode should save to the cloud backed Datasource
store when calling save_datasource. When saving, it should use the id from
the response to create the datasource, and update both the
config and cache."""

context: CloudDataContext = empty_cloud_data_context

# Make sure the fixture has the right configuration
assert len(context.list_datasources()) == 0

datasource_to_save: Datasource = context._build_datasource_from_config(
datasource_config_with_names
)
data_connector_name: str = tuple(datasource_to_save.data_connectors.keys())[0]

with patch(
"great_expectations.data_context.store.datasource_store.DatasourceStore.set",
autospec=True,
return_value=datasource_config_with_names_and_ids,
):
saved_datasource: Union[
LegacyDatasource, BaseDatasource
] = context.save_datasource(datasource_to_save)

_save_datasource_assertions(
context=context,
datasource_to_save_config=datasource_config_with_names_and_ids,
datasource_to_save=datasource_to_save,
saved_datasource=saved_datasource,
attributes_to_verify=(
"name",
"execution_engine",
),
)

serializer: AbstractConfigSerializer = DictConfigSerializer(
schema=datasourceConfigSchema
)
saved_datasource_dict = serializer.serialize(
datasourceConfigSchema.load(saved_datasource.config)
)
orig_datasource_dict = serializer.serialize(
datasourceConfigSchema.load(datasource_to_save.config)
)

updated_datasource_dict_no_datasource_id = copy.deepcopy(saved_datasource_dict)
updated_datasource_dict_no_datasource_id["data_connectors"][
data_connector_name
].pop("id", None)
assert (
updated_datasource_dict_no_datasource_id["data_connectors"]
== orig_datasource_dict["data_connectors"]
)

# Make sure that the id is populated only in the updated and cached datasource
assert datasource_to_save.id is None
assert datasource_to_save.data_connectors[data_connector_name].id is None
assert saved_datasource.id == datasource_config_with_names_and_ids.id
assert (
saved_datasource.data_connectors[data_connector_name].id
== datasource_config_with_names_and_ids.data_connectors[
data_connector_name
]["id"]
)

0 comments on commit 4b0276e

Please sign in to comment.