Changing a document's index temporarily #26
-
I need to temporarily change a document's index while running a series of unit tests so that the unit test data does not contaminate the non-test data. For example, I have a document Car that has an index car_index, I'm building unit tests around getting/saving car documents in opensearch. I do not want to the test data to mingle with the actual data. I was thinking the best way would be to temporarily create an index like car_index_test and apply that to the Car document class and while the unit test is running it would save data to that index, which would then be removed at the end of the suite. I can't seem to find a way to do this. Is there such a way, or a better way to keep test data from contaminating production data? Thanks! |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 1 reply
-
Hi @johnsonjackc , My advice would be to create a test-specific Document class that would extend you What do you think? |
Beta Was this translation helpful? Give feedback.
-
I usually set up another Opensearch instance for tests, and add it to the Then I can use it in my tests with the For instance, here is my pytest fixture to handle creation/deletion/indexation in my tests: @pytest.fixture
def setup_opensearch(db):
"""Return a function allowing to rebuild Elasticsearch Index
Each function has the following parameters:
using: string
Opensearch connection to use as defined in the `OPENSEARCH_DSL` setting.
indices: List[str]
Names of the indices that should be created/deleted/populated. If None
are provided, execute the action on all the index of the registry.
index_names: Dict[Type[DSLDocument], str]
Dictionary matching a DSLDocument to a new name, effectively replacing
the name of these index.
"""
class SetupOpensearch:
def create(self, using="test", indices=None, index_names=None):
"""Create Elasticsearch's index."""
index_names = index_names or {}
reg_indices = registry.get_indices()
indices = indices or {i._name for i in reg_indices}
for i in reg_indices:
if i._name not in indices:
continue
# Override the index name if an alias is provided
i._name = index_names.get(i._name, i._name) # noqa
i.create(using=using)
def populate(self, using="test", indices=None, index_names=None):
"""Create Elasticsearch's index."""
index_names = index_names or {}
reg_indices = registry.get_indices()
indices = indices or {i._name for i in reg_indices}
for i in reg_indices:
if i._name not in indices:
continue
document = i._doc_types[0]() # noqa
# Override the index name if an alias is provided
document._index._name = index_names.get(document._index._name, document._index._name)
qs = document.get_indexing_queryset()
document.update(qs, refresh=True, action="index", using=using)
def delete(self, using="test", indices=None, index_names=None):
"""Create Elasticsearch's index."""
index_names = index_names or {}
reg_indices = registry.get_indices()
indices = indices or {i._name for i in reg_indices}
for i in reg_indices:
if i._name not in indices:
continue
# Override the index name if an alias is provided
i._name = index_names.get(i._name, i._name)
i.delete(using=using, ignore_unavailable=True)
def rebuild(self, using="test", indices=None, index_names=None):
"""Rebuild Elasticsearch's index."""
self.delete(using=using, indices=indices, index_names=index_names)
self.create(using=using, indices=indices, index_names=index_names)
self.populate(using=using, indices=indices, index_names=index_names)
def __call__(self, using="test", indices=None, index_names=None):
self.rebuild(using=using, indices=indices, index_names=index_names)
return SetupOpensearch() |
Beta Was this translation helpful? Give feedback.
I usually set up another Opensearch instance for tests, and add it to the
OPENSEARCH_DSL
config dict.Then I can use it in my tests with the
using
kwarg.For instance, here is my pytest fixture to handle creation/deletion/indexation in my tests: