Skip to content

Commit

Permalink
Merge pull request #318 from hellohaptik/develop
Browse files Browse the repository at this point in the history
Develop to Master 22nd October 2019
  • Loading branch information
chiragjn authored Oct 23, 2019
2 parents 07a69f1 + 8568efb commit f30e705
Show file tree
Hide file tree
Showing 63 changed files with 2,533 additions and 1,167 deletions.
2 changes: 1 addition & 1 deletion datastore/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from datastore import DataStore
from .datastore import DataStore
83 changes: 48 additions & 35 deletions datastore/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from chatbot_ner.config import ner_logger, CHATBOT_NER_DATASTORE
from datastore import elastic_search
from datastore.constants import (ELASTICSEARCH, ENGINE, ELASTICSEARCH_INDEX_NAME, DEFAULT_ENTITY_DATA_DIRECTORY,
from datastore.constants import (ELASTICSEARCH, ENGINE, ELASTICSEARCH_INDEX_NAME,
ELASTICSEARCH_DOC_TYPE, ELASTICSEARCH_CRF_DATA_INDEX_NAME,
ELASTICSEARCH_CRF_DATA_DOC_TYPE)
from datastore.exceptions import (DataStoreSettingsImproperlyConfiguredException, EngineNotImplementedException,
Expand Down Expand Up @@ -126,7 +126,7 @@ def create(self, **kwargs):
)

# FIXME: repopulate does not consider language of the variants
def populate(self, entity_data_directory_path=DEFAULT_ENTITY_DATA_DIRECTORY, csv_file_paths=None, **kwargs):
def populate(self, entity_data_directory_path=None, csv_file_paths=None, **kwargs):
"""
Populates the datastore from csv files stored in directory path indicated by entity_data_directory_path and
from csv files at file paths in csv_file_paths list
Expand All @@ -143,6 +143,11 @@ def populate(self, entity_data_directory_path=DEFAULT_ENTITY_DATA_DIRECTORY, csv
All other exceptions raised by elasticsearch-py library
"""
if not (entity_data_directory_path or csv_file_paths):
raise ValueError('Both `entity_data_directory_path` and `csv_file_paths` arguments cannot be None.'
'Either provide a path to directory containing csv files using '
'`entity_data_directory_path` or a list of paths to csv files '
'using `csv_file_paths`')
if self._client_or_connection is None:
self._connect()

Expand Down Expand Up @@ -317,7 +322,7 @@ def delete_entity(self, entity_name, **kwargs):
**kwargs)

# FIXME: repopulate does not consider language of the variants
def repopulate(self, entity_data_directory_path=DEFAULT_ENTITY_DATA_DIRECTORY, csv_file_paths=None, **kwargs):
def repopulate(self, entity_data_directory_path=None, csv_file_paths=None, **kwargs):
"""
Deletes the existing data and repopulates it for entities from csv files stored in directory path indicated by
entity_data_directory_path and from csv files at file paths in csv_file_paths list
Expand All @@ -334,6 +339,12 @@ def repopulate(self, entity_data_directory_path=DEFAULT_ENTITY_DATA_DIRECTORY, c
DataStoreSettingsImproperlyConfiguredException if connection settings are invalid or missing
All other exceptions raised by elasticsearch-py library
"""
if not (entity_data_directory_path or csv_file_paths):
raise ValueError('Both `entity_data_directory_path` and `csv_file_paths` arguments cannot be None.'
'Either provide a path to directory containing csv files using '
'`entity_data_directory_path` or a list of paths to csv files '
'using `csv_file_paths`')

if self._client_or_connection is None:
self._connect()

Expand Down Expand Up @@ -564,37 +575,40 @@ def transfer_entities_elastic_search(self, entity_list):
es_object = elastic_search.transfer.ESTransfer(source=es_url, destination=destination)
es_object.transfer_specific_entities(list_of_entities=entity_list)

def get_crf_data_for_entity_name(self, entity_name, **kwargs):
def get_crf_data_for_entity_name(self, entity_name, languages, **kwargs):
"""
This method is used to obtain the sentences and entities from sentences given entity name
Args:
entity_name (str): Entity name for which training data needs to be obtained
kwargs:
For Elasticsearch:
Refer https://elasticsearch-py.readthedocs.io/en/master/api.html#elasticsearch.Elasticsearch.search
languages (List[str]): list of languges codes for which data is requested
**kwargs: For Elasticsearch:
Refer https://elasticsearch-py.readthedocs.io/en/master/api.html#elasticsearch.Elasticsearch.search
Returns:
results_dictionary(dict): Dictionary consisting of the training data for the the given entity.
Raises:
IndexNotFoundException if es_training_index was not found in connection settings
IndexNotFoundException: Description
IndexNotFoundException if es_training_index was not found in connection settings
Example:
db = Datastore()
db.get_entity_training_data(entity_name, **kwargs):
>> {
'sentence_list': [
'My name is hardik',
'This is my friend Ajay'
'sentence_list': [
'My name is hardik',
'This is my friend Ajay'
],
'entity_list': [
[
'hardik'
],
'entity_list': [
[
'hardik'
],
[
'Ajay'
]
[
'Ajay'
]
}
]
}
"""
ner_logger.debug('Datastore, get_entity_training_data, entity_name %s' % entity_name)
if self._client_or_connection is None:
Expand All @@ -612,24 +626,24 @@ def get_crf_data_for_entity_name(self, entity_name, **kwargs):
index_name=es_training_index,
doc_type=self._connection_settings[ELASTICSEARCH_CRF_DATA_DOC_TYPE],
entity_name=entity_name,
languages=languages,
request_timeout=request_timeout,
**kwargs)
ner_logger.debug('Datastore, get_entity_training_data, results_dictionary %s' % str(entity_name))
return results_dictionary

def update_entity_crf_data(self, entity_name, entity_list, language_script, sentence_list, **kwargs):
def update_entity_crf_data(self, entity_name, sentences, **kwargs):
"""
This method is used to populate the training data for a given entity
Args:
entity_name (str): Name of the entity for which the training data has to be populated
entity_list (list): List consisting of the entities corresponding to the sentence_list
sentence_list (list): List of sentences for training
language_script (str): Language code for the language script used.
**kwargs:
For Elasticsearch:
sentences (Dict[str, List[Dict[str, str]]]: sentences mapped against their languages
**kwargs: For Elasticsearch:
Refer http://elasticsearch-py.readthedocs.io/en/master/helpers.html#elasticsearch.helpers.bulk
Raises:
IndexNotFoundException: Description
IndexNotFoundException if es_training_index was not found in connection settings
"""
if self._client_or_connection is None:
Expand All @@ -643,13 +657,12 @@ def update_entity_crf_data(self, entity_name, entity_list, language_script, sent
raise IndexNotFoundException('Index for ELASTICSEARCH_CRF_DATA_INDEX_NAME not found. '
'Please configure the same')

elastic_search.populate.update_entity_crf_data_populate(connection=self._client_or_connection,
index_name=es_training_index,
doc_type=self._connection_settings
[ELASTICSEARCH_CRF_DATA_DOC_TYPE],
logger=ner_logger,
entity_list=entity_list,
sentence_list=sentence_list,
entity_name=entity_name,
language_script=language_script,
**kwargs)
elastic_search \
.populate \
.update_entity_crf_data_populate(connection=self._client_or_connection,
index_name=es_training_index,
doc_type=self._connection_settings[ELASTICSEARCH_CRF_DATA_DOC_TYPE],
logger=ner_logger,
sentences=sentences,
entity_name=entity_name,
**kwargs)
6 changes: 1 addition & 5 deletions datastore/elastic_search/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1 @@
import connect
import create
import populate
import query
import transfer
from . import connect, create, populate, query, transfer
166 changes: 100 additions & 66 deletions datastore/elastic_search/populate.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@
# 3rd party imports
from elasticsearch import helpers

# Local imports
from chatbot_ner.config import ner_logger
from datastore import constants
from datastore.elastic_search.query import get_entity_data
from datastore.utils import get_files_from_directory, read_csv, remove_duplicate_data
from external_api.constants import SENTENCE, ENTITIES
from language_utilities.constant import ENGLISH_LANG
from ner_constants import DICTIONARY_DATA_VARIANTS

# Local imports

log_prefix = 'datastore.elastic_search.populate'


Expand Down Expand Up @@ -280,83 +282,115 @@ def entity_data_update(connection, index_name, doc_type, entity_data, entity_nam
logger.debug('%s: +++ Completed: add_data_elastic_search() +++' % log_prefix)


def update_entity_crf_data_populate(
connection, index_name, doc_type, entity_list, entity_name, sentence_list, language_script, logger, **kwargs
):
def delete_entity_crf_data(connection, index_name, doc_type, entity_name, languages):
"""Delete CRF data for the given entity and languages.
Args:
connection (Elasticsearch): Elasticsearch client object
index_name (str): name of the index
doc_type (str): type of the documents being indexed
entity_name (str): ame of the entity for which the training data has to be deleted
languages (List[str]): list of language codes for which data needs to be deleted
Returns:
TYPE: Description
"""
query = {
"query": {
"bool": {
"must": [
{
"match": {
"entity_data": entity_name
}
}
],
"filter": {
"terms": {
"language_script": languages
}
}
}
}
}
return connection.delete_by_query(index=index_name, body=query, doc_type=doc_type)


def update_entity_crf_data_populate(connection, index_name, doc_type, entity_name, sentences, logger, **kwargs):
"""
This method is used to populate the elastic search traininf data.
This method is used to populate the elastic search training data.
Args:
connection: Elasticsearch client object
index_name (str): The name of the index
doc_type (str): The type of the documents being indexed
entity_name (str): Name of the entity for which the training data has to be populated
entity_list (list): List consisting of the entities corresponding to the sentence_list
sentence_list (list): List of sentences for training
language_script (str): The code for the language script
logger: logging object to log at debug and exception levellogging object to log at debug and exception level
connection (Elasticsearch): Elasticsearch client object
index_name (str): name of the index
doc_type (str): type of the documents being indexed
entity_name (str): name of the entity for which the training data has to be populated
sentences (Dict[str, List[Dict[str, str]]]): sentences collected per language
logger: logging object
**kwargs: Refer http://elasticsearch-py.readthedocs.io/en/master/helpers.html#elasticsearch.helpers.bulk
"""
logger.debug('%s: +++ Started: external_api_training_data_entity_update() +++' % log_prefix)
logger.debug('%s: +++ Started: delete_entity_by_name() +++' % log_prefix)
delete_entity_by_name(connection=connection, index_name=index_name, doc_type=doc_type,
entity_name=entity_name, logger=logger, **kwargs)
logger.debug('%s: +++ Completed: delete_entity_by_name() +++' % log_prefix)
logger.debug('[{0}] Started: external_api_training_data_entity_update()'.format(log_prefix))

logger.debug('%s: +++ Started: add_training_data_elastic_search() +++' % log_prefix)
add_training_data_elastic_search(connection=connection, index_name=index_name, doc_type=doc_type,
entity_name=entity_name,
entity_list=entity_list,
sentence_list=sentence_list,
language_script=language_script, logger=logger, **kwargs)
logger.debug('%s: +++ Completed: add_training_data_elastic_search() +++' % log_prefix)
logger.debug('[{0}] Started: delete_entity_crf_data()'.format(log_prefix))
languages = list(sentences.keys())
delete_entity_crf_data(connection=connection, index_name=index_name, doc_type=doc_type,
entity_name=entity_name, languages=languages)
logger.debug('[{0}] Completed: delete_entity_crf_data()'.format(log_prefix))

logger.debug('[{0}] Started: add_training_data_elastic_search()'.format(log_prefix))
add_crf_training_data_elastic_search(connection=connection,
index_name=index_name,
doc_type=doc_type,
entity_name=entity_name,
sentences=sentences,
logger=logger, **kwargs)
logger.debug('[{0}] Completed: add_training_data_elastic_search()'.format(log_prefix))

def add_training_data_elastic_search(
connection, index_name, doc_type, entity_name, entity_list,
sentence_list, language_script, logger, **kwargs
):
logger.debug('[{0}] Completed: external_api_training_data_entity_update()'.format(log_prefix))


def add_crf_training_data_elastic_search(connection, index_name, doc_type, entity_name, sentences, logger, **kwargs):
"""
Adds all sentences and the corresponding entities to the specified index.
If the same named entity is found a delete followed by an update is triggered
Args:
connection: Elasticsearch client object
index_name (str): The name of the index
doc_type (str): The type of the documents being indexed
entity_name (str): Name of the entity for which the training data has to be populated
entity_list (list): List consisting of the entities corresponding to the sentence_list
sentence_list (list): List of sentences for training
logger: logging object to log at debug and exception level
language_script (str): Language code of the entity script
kwargs:
Refer http://elasticsearch-py.readthedocs.io/en/master/helpers.html#elasticsearch.helpers.bulk
Example of underlying index query
{'_index': 'training_index',
'entity_data': 'name',
'sentence': ['My name is Ajay and this is my friend Hardik'],
'entities': ['Ajay', 'Hardik'],
'language_script': 'en',
'_type': 'training_index',
'_op_type': 'index'
}
connection (Elasticsearch): Description
index_name (str): Description
doc_type (str): Description
entity_name (str): Description
sentences (Dict[str, List[Dict[str, str]]]): Description
logger (TYPE): Description
**kwargs: Description
Example of underlying index query
{'_index': 'training_index',
'entity_data': 'name',
'sentence': ['My name is Ajay and this is my friend Hardik'],
'entities': ['Ajay', 'Hardik'],
'language_script': 'en',
'_type': 'training_index',
'_op_type': 'index'
}
"""
str_query = []
for sentence, entities in zip(sentence_list, entity_list):
query_dict = {'_index': index_name,
'entity_data': entity_name,
'sentence': sentence,
'entities': entities,
'language_script': language_script,
'_type': doc_type,
'_op_type': 'index'
}
str_query.append(query_dict)
if len(str_query) > constants.ELASTICSEARCH_BULK_HELPER_MESSAGE_SIZE:
result = helpers.bulk(connection, str_query, stats_only=True, **kwargs)
logger.debug('%s: \t++ %s status %s ++' % (log_prefix, entity_name, result))
str_query = []
if str_query:
result = helpers.bulk(connection, str_query, stats_only=True, **kwargs)
logger.debug('%s: \t++ %s status %s ++' % (log_prefix, entity_name, result))
queries = []
for language, sentences in sentences.items():
for sentence in sentences:
query_dict = {'_index': index_name,
'entity_data': entity_name,
'sentence': sentence[SENTENCE],
'entities': sentence[ENTITIES],
'language_script': language,
'_type': doc_type,
'_op_type': 'index'
}
queries.append(query_dict)
if len(queries) > constants.ELASTICSEARCH_BULK_HELPER_MESSAGE_SIZE:
result = helpers.bulk(connection, queries, stats_only=True, **kwargs)
logger.debug('[{0}] Insert: {1} with status {2}'.format(log_prefix, entity_name, result))
queries = []
if queries:
result = helpers.bulk(connection, queries, stats_only=True, **kwargs)
logger.debug('[{0}] Insert: {1} with status {2}'.format(log_prefix, entity_name, result))


def delete_entity_data_by_values(connection, index_name, doc_type, entity_name, values=None, **kwargs):
Expand Down
Loading

0 comments on commit f30e705

Please sign in to comment.