Skip to content

Commit

Permalink
Merge pull request #129 from mattflax/GH127_Override_ES_index
Browse files Browse the repository at this point in the history
#127: Implement ES index name overrides.
  • Loading branch information
agazzarini authored Jan 12, 2021
2 parents 02b4275 + fabd4bc commit a0a92bf
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
* SearchPlatform implementation for connecting to and reading from an external
Expand All @@ -55,6 +56,7 @@ public class ExternalElasticsearch extends Elasticsearch {
private static final String NAME = "External Elasticsearch";
static final String SETTINGS_FILE = "index-settings.json";

private final Map<String, IndexSettings> indexSettings = new HashMap<>();
private final Map<String, RestHighLevelClient> indexClients = new HashMap<>();

@Override
Expand All @@ -76,6 +78,7 @@ public void load(File dataToBeIndexed, File settingsFile, String collection, Str
try {
// Load the index settings for this version of the search platform
IndexSettings settings = mapper.readValue(settingsFile, IndexSettings.class);
indexSettings.put(getFullyQualifiedDomainName(collection, version), settings);
if (indexClients.get(version) == null) {
indexClients.put(version, initialiseClient(settings.getHostUrls(), settings.getUser(), settings.getPassword()));
}
Expand Down Expand Up @@ -111,10 +114,10 @@ private RestHighLevelClient initialiseClient(List<String> hosts, String user, St
}

@Override
public QueryOrSearchResponse executeQuery(final String collection, String version, final String query, final String[] fields, final int maxRows) {
public QueryOrSearchResponse executeQuery(final String collection, final String version, final String query, final String[] fields, final int maxRows) {

try {
final SearchRequest request = buildSearchRequest(collection, query, fields, maxRows);
final SearchRequest request = buildSearchRequest(resolveIndexName(collection, version), query, fields, maxRows);
final SearchResponse response = runQuery(version, request);
return convertResponse(response);
} catch (final ElasticsearchException e) {
Expand Down Expand Up @@ -164,28 +167,46 @@ private void closeClient(RestHighLevelClient client) {
@Override
public boolean checkCollection(String collection, String version) {
try {
return indexClients.get(version).indices().exists(new GetIndexRequest(collection), RequestOptions.DEFAULT);
return indexClients.get(version).indices().exists(new GetIndexRequest(resolveIndexName(collection, version)), RequestOptions.DEFAULT);
} catch (IOException e) {
LOGGER.error("Caught IOException checking collection {} version {}: {}", collection, version, e.getMessage());
LOGGER.error(e);
return false;
}
}

/**
* Resolve the correct index name for the version, allowing the name to be
* overridden in config if required.
*
* @param defaultIndex the default index name.
* @param version the configuration version.
* @return the correct index name.
*/
private String resolveIndexName(String defaultIndex, String version) {
return Optional.ofNullable(indexSettings.get(getFullyQualifiedDomainName(defaultIndex, version)).getIndex())
.orElse(defaultIndex);
}

public static class IndexSettings {
@JsonProperty("hostUrls")
private final List<String> hostUrls;

@JsonProperty("index")
private final String index;

@JsonProperty("user")
private final String user;

@JsonProperty("password")
private final String password;

public IndexSettings(@JsonProperty("hostUrls") List<String> hostUrls,
@JsonProperty("index") String index,
@JsonProperty("user") String user,
@JsonProperty("password") String password) {
this.hostUrls = hostUrls;
this.index = index;
this.user = user;
this.password = password;
}
Expand All @@ -194,6 +215,10 @@ public List<String> getHostUrls() {
return hostUrls;
}

public String getIndex() {
return index;
}

public String getUser() {
return user;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
*/
package io.sease.rre.search.api.impl;

import io.sease.rre.search.api.QueryOrSearchResponse;
import io.sease.rre.search.api.SearchPlatform;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
Expand All @@ -31,7 +34,10 @@

import java.io.File;
import java.io.FileWriter;
import java.util.HashMap;
import java.util.Map;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

Expand All @@ -48,59 +54,157 @@
*/
public class ExternalElasticSearchIT {

private static String ELASTICSEARCH_CONTAINER_BASE = "docker.elastic.co/elasticsearch/elasticsearch";
private static String DEFAULT_ELASTICSEARCH_VERSION = "7.5.0";
private static final String ELASTICSEARCH_CONTAINER_BASE = "docker.elastic.co/elasticsearch/elasticsearch";
private static final String DEFAULT_ELASTICSEARCH_VERSION = "7.5.0";

private static final String INDEX_NAME = "test";
private static final String INDEX_VERSION = "v1.0";
private static final String INDEX_NAME = "test";
private static final String INDEX_VERSION = "v1.0";

private static final DockerImageName DOCKER_IMAGE = DockerImageName.parse(ELASTICSEARCH_CONTAINER_BASE + ":" + System.getProperty("elasticsearch.version", DEFAULT_ELASTICSEARCH_VERSION));
private static final DockerImageName DOCKER_IMAGE = DockerImageName.parse(ELASTICSEARCH_CONTAINER_BASE + ":" + System.getProperty("elasticsearch.version", DEFAULT_ELASTICSEARCH_VERSION));

@Rule
public TemporaryFolder tempFolder = new TemporaryFolder();
@Rule
public TemporaryFolder tempFolder = new TemporaryFolder();

private SearchPlatform platform;
private SearchPlatform platform;

@Before
public void setupPlatform() throws Exception {
platform = new ExternalElasticsearch();
}
@Before
public void setupPlatform() {
platform = new ExternalElasticsearch();
}

@Test
public void checkCollection_returnsFalseWhenNotLoaded() throws Exception {
final ElasticsearchContainer es = new ElasticsearchContainer(DOCKER_IMAGE);
es.start();
@Test
public void checkCollection_returnsFalseWhenNotLoaded() throws Exception {
final ElasticsearchContainer es = new ElasticsearchContainer(DOCKER_IMAGE);
es.start();

final File settingsFile = tempFolder.newFile("ccrf_settings.json");
FileWriter fw = new FileWriter(settingsFile);
fw.write("{ \"hostUrls\": [ \"" + es.getHttpHostAddress() + "\" ]}");
fw.close();
final File settingsFile = tempFolder.newFile("ccrf_settings.json");
FileWriter fw = new FileWriter(settingsFile);
fw.write("{ \"hostUrls\": [ \"" + es.getHttpHostAddress() + "\" ]}");
fw.close();

platform.load(null, settingsFile, INDEX_NAME, INDEX_VERSION);
platform.load(null, settingsFile, INDEX_NAME, INDEX_VERSION);

assertFalse(platform.checkCollection(INDEX_NAME, INDEX_VERSION));
es.close();
}
assertFalse(platform.checkCollection(INDEX_NAME, INDEX_VERSION));
es.close();
}

@Test
public void checkCollection_returnsTrueWhenAvailable() throws Exception {
final ElasticsearchContainer es = new ElasticsearchContainer(DOCKER_IMAGE);
es.start();
@Test
public void checkCollection_returnsTrueWhenAvailable() throws Exception {
final ElasticsearchContainer es = new ElasticsearchContainer(DOCKER_IMAGE);
es.start();

// Create an index
final RestHighLevelClient hlClient = new RestHighLevelClient(RestClient.builder(HttpHost.create(es.getHttpHostAddress())));
hlClient.indices().create(new CreateIndexRequest(INDEX_NAME), RequestOptions.DEFAULT);
// Create an index
final RestHighLevelClient hlClient = new RestHighLevelClient(RestClient.builder(HttpHost.create(es.getHttpHostAddress())));
hlClient.indices().create(new CreateIndexRequest(INDEX_NAME), RequestOptions.DEFAULT);

final File settingsFile = tempFolder.newFile("ccrt_settings.json");
FileWriter fw = new FileWriter(settingsFile);
fw.write("{ \"hostUrls\": [ \"" + es.getHttpHostAddress() + "\" ]}");
fw.close();
final File settingsFile = tempFolder.newFile("ccrt_settings.json");
FileWriter fw = new FileWriter(settingsFile);
fw.write("{ \"hostUrls\": [ \"" + es.getHttpHostAddress() + "\" ]}");
fw.close();

platform.load(null, settingsFile, INDEX_NAME, INDEX_VERSION);
platform.load(null, settingsFile, INDEX_NAME, INDEX_VERSION);

assertTrue(platform.checkCollection(INDEX_NAME, INDEX_VERSION));
hlClient.close();
es.close();
}
assertTrue(platform.checkCollection(INDEX_NAME, INDEX_VERSION));
hlClient.close();
es.close();
}

@Test
public void checkCollection_allowsIndexOverride() throws Exception {
final String overrideIndex = "override";

final ElasticsearchContainer es = new ElasticsearchContainer(DOCKER_IMAGE);
es.start();

// Create an index
final RestHighLevelClient hlClient = new RestHighLevelClient(RestClient.builder(HttpHost.create(es.getHttpHostAddress())));
hlClient.indices().create(new CreateIndexRequest(overrideIndex), RequestOptions.DEFAULT);

final File settingsFile = tempFolder.newFile("ccaio_settings.json");
FileWriter fw = new FileWriter(settingsFile);
fw.write("{ \"hostUrls\": [ \"" + es.getHttpHostAddress() + "\" ], \"index\": \"" + overrideIndex + "\" }");
fw.close();

platform.load(null, settingsFile, INDEX_NAME, INDEX_VERSION);

assertTrue(platform.checkCollection(INDEX_NAME, INDEX_VERSION));
hlClient.close();
es.close();
}


@Test
public void executeQuery_returnsResults() throws Exception {
final String overrideIndex = "override";

final ElasticsearchContainer es = new ElasticsearchContainer(DOCKER_IMAGE);
es.start();

// Create the default index
final RestHighLevelClient hlClient = new RestHighLevelClient(RestClient.builder(HttpHost.create(es.getHttpHostAddress())));
hlClient.indices().create(new CreateIndexRequest(INDEX_NAME), RequestOptions.DEFAULT);

// Add some docs to the index
final Map<String, Object> source = new HashMap<>();
source.put("title", "Test");
hlClient.index(new IndexRequest(INDEX_NAME).id("1").source(source), RequestOptions.DEFAULT);
hlClient.index(new IndexRequest(INDEX_NAME).id("2").source(source), RequestOptions.DEFAULT);
hlClient.index(
new IndexRequest(INDEX_NAME).id("3").source(source).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE),
RequestOptions.DEFAULT);

final File settingsFile = tempFolder.newFile("eq_settings.json");
FileWriter fw = new FileWriter(settingsFile);
fw.write("{ \"hostUrls\": [ \"" + es.getHttpHostAddress() + "\" ]}");
fw.close();

platform.load(null, settingsFile, INDEX_NAME, INDEX_VERSION);

final QueryOrSearchResponse response = platform.executeQuery(INDEX_NAME, INDEX_VERSION, "{ \"query\": { \"match_all\": {} }}", source.keySet().toArray(new String[0]), 10);

assertEquals(3, response.totalHits());

hlClient.close();
es.close();
}

@Test
public void executeQuery_allowsIndexOverride() throws Exception {
final String overrideIndex = "override";

final ElasticsearchContainer es = new ElasticsearchContainer(DOCKER_IMAGE);
es.start();

// Create the default index
final RestHighLevelClient hlClient = new RestHighLevelClient(RestClient.builder(HttpHost.create(es.getHttpHostAddress())));
hlClient.indices().create(new CreateIndexRequest(INDEX_NAME), RequestOptions.DEFAULT);

// Create an override index
hlClient.indices().create(new CreateIndexRequest(overrideIndex), RequestOptions.DEFAULT);

// Add some docs to the base index
final Map<String, Object> source = new HashMap<>();
source.put("title", "Test");
hlClient.index(new IndexRequest(INDEX_NAME).id("1").source(source), RequestOptions.DEFAULT);

// Add a different number of docs to the override index
hlClient.index(new IndexRequest(overrideIndex).id("1").source(source), RequestOptions.DEFAULT);
hlClient.index(
new IndexRequest(overrideIndex).id("2").source(source).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE),
RequestOptions.DEFAULT);

final File settingsFile = tempFolder.newFile("eqaio_settings.json");
FileWriter fw = new FileWriter(settingsFile);
fw.write("{ \"hostUrls\": [ \"" + es.getHttpHostAddress() + "\" ], \"index\": \"" + overrideIndex + "\" }");
fw.close();

platform.load(null, settingsFile, INDEX_NAME, INDEX_VERSION);

final QueryOrSearchResponse response = platform.executeQuery(INDEX_NAME, INDEX_VERSION, "{ \"query\": { \"match_all\": {} }}", source.keySet().toArray(new String[0]), 10);

assertEquals(2, response.totalHits());

hlClient.close();
es.close();
}
}

0 comments on commit a0a92bf

Please sign in to comment.