- User guide of OpenSearch Python client
To add the client to your project, install it using pip:
pip install opensearch-py
Then import it like any other module:
from opensearchpy import OpenSearch
To add the async client to your project, install it using pip:
pip install opensearch-py[async]
If you prefer to add the client manually or just want to examine the source code, see opensearch-py on GitHub.
In the example given below, we create a client, an index with non-default settings, insert a document in the index, search for the document, delete the document and finally delete the index.
from opensearchpy import OpenSearch
host = 'localhost'
port = 9200
auth = ('admin', 'admin') # For testing only. Don't store credentials in code.
# Provide a CA bundle if you use intermediate CAs with your root CA.
# If this is not given, the CA bundle is is discovered from the first available
# following options:
# - OpenSSL environment variables SSL_CERT_FILE and SSL_CERT_DIR
# - certifi bundle (https://pypi.org/project/certifi/)
# - default behavior of the connection backend (most likely system certs)
ca_certs_path = '/full/path/to/root-ca.pem'
# Optional client certificates if you don't want to use HTTP basic authentication.
# client_cert_path = '/full/path/to/client.pem'
# client_key_path = '/full/path/to/client-key.pem'
# Create the client with SSL/TLS enabled, but hostname verification disabled.
client = OpenSearch(
hosts = [{'host': host, 'port': port}],
http_compress = True, # enables gzip compression for request bodies
http_auth = auth,
# client_cert = client_cert_path,
# client_key = client_key_path,
use_ssl = True,
verify_certs = True,
ssl_assert_hostname = False,
ssl_show_warn = False,
ca_certs = ca_certs_path
)
# Create an index with non-default settings.
index_name = 'python-test-index3'
index_body = {
'settings': {
'index': {
'number_of_shards': 4
}
}
}
response = client.indices.create(index_name, body=index_body)
print('\nCreating index:')
print(response)
document = {
'title': 'Moneyball',
'director': 'Bennett Miller',
'year': '2011'
}
id = '1'
response = client.index(
index = index_name,
body = document,
id = id,
refresh = True
)
print('\nAdding document:')
print(response)
docs = '''{"index": {"_index": "index-2022-06-08", "_id": "1"}}
{"name": "foo"}
{"index": {"_index": "index-2022-06-09", "_id": "2"}}
{"name": "bar"}
{"index": {"_index": "index-2022-06-10", "_id": "3"}}
{"name": "baz"}'''
response = client.bulk(docs)
print('\nAdding bulk documents:')
print(response)
docs = []
def generate_data():
mywords = ['foo', 'bar', 'baz']
for index, word in enumerate(mywords):
docs.append({
"_index": "mywords",
"word": word,
"_id": index
})
return docs
response = helpers.bulk(client, generate_data(), max_retries=3)
print('\nAdding bulk documents using helper:')
print(response)
q = 'miller'
query = {
'size': 5,
'query': {
'multi_match': {
'query': q,
'fields': ['title^2', 'director']
}
}
}
response = client.search(
body = query,
index = index_name
)
print('\nSearch results:')
print(response)
response = client.delete(
index = index_name,
id = id
)
print('\nDeleting document:')
print(response)
response = client.indices.delete(
index = index_name
)
print('\nDeleting index:')
print(response)
# create a point in time on a index
index_name = "test-index"
response = client.create_point_in_time(index=index_name,
keep_alive="1m")
pit_id = response.get("pit_id")
print('\n Point in time ID:')
print(pit_id)
# To list all point in time which are alive in the cluster
response = client.list_all_point_in_time()
print('\n List of all Point in Time:')
print(response)
# To delete point in time
pit_body = {
"pit_id": [pit_id]
}
# To delete all point in time
# client.delete_point_in_time(body=None, all=True)
response = client.delete_point_in_time(body=pit_body)
print('\n The deleted point in time:')
print(response)
opensearch-dsl-py client is now merged into the opensearch-py client. Thus, opensearch-py supports creating and indexing documents, searching with and without filters, and updating documents using queries. See opensearch-dsl-py client documentation for details.
All the APIs newly added from opensearch-dsl-py are listed in docs.
In the below example, Search API from opensearch-dsl-py client is used.
from opensearchpy import OpenSearch, Search
# Use the above mentioned examples for creating client.
# Then,create an index
# Add a document to the index.
# Search for the document.
s = Search(using=client, index=index_name) \
.filter("term", category="search") \
.query("match", title="python")
response = s.execute()
print('\nSearch results:')
for hit in response:
print(hit.meta.score, hit.title)
# Delete the document.
# Delete the index.
Plugin client definitions can be found here --
print('\Searching for monitors:')
query = {
"query": {
"match" : {
"monitor.name": "test-monitor"
}
}
}
response = client.plugins.alerting.search_monitor(query)
print(response)
print('\Getting a monitor:')
response = client.plugins.alerting.get_monitor("monitorID")
print(response)
print('\Creating a bucket level monitor:')
query = {
"type": "monitor",
"name": "Demo bucket-level monitor",
"monitor_type": "bucket_level_monitor",
"enabled": True,
"schedule": {
"period": {
"interval": 1,
"unit": "MINUTES"
}
},
"inputs": [
{
"search": {
"indices": [
"python-test-index3"
],
"query": {
"size": 0,
"query": {
"bool": {
"filter": [
{
"range": {
"order_date": {
"from": "||-1h",
"to": "",
"include_lower": True,
"include_upper": True,
"format": "epoch_millis"
}
}
}
]
}
},
"aggregations": {
"composite_agg": {
"composite": {
"sources": [
{
"user": {
"terms": {
"field": "user"
}
}
}
]
},
"aggregations": {
"avg_products_base_price": {
"avg": {
"field": "products.base_price"
}
}
}
}
}
}
}
}
],
}
response = client.plugins.alerting.create_monitor(query)
print(response)
print('\Creating an email destination:')
query = {
"type": "email",
"name": "my-email-destination",
"email": {
"email_account_id": "YjY7mXMBx015759_IcfW",
"recipients": [
{
"type": "email_group",
"email_group_id": "YzY-mXMBx015759_dscs"
},
{
"type": "email",
"email": "example@email.com"
}
]
}
}
response = client.plugins.alerting.create_destination(query)
print(response)
print('\Getting alerts:')
response = client.plugins.alerting.get_alerts()
print(response)
print('\Acknowledge alerts:')
query = {
"alerts": ["eQURa3gBKo1jAh6qUo49"]
}
response = client.plugins.alerting.acknowledge_alert(query)
print(response)
It is possible to use different methods for the authentication to OpenSearch. The parameters of connection_class
and http_auth
can be used for this. The following examples show how to authenticate using IAM credentials and using Kerberos.
Refer the AWS documentation regarding usage of IAM credentials to sign requests to OpenSearch APIs - Signing HTTP requests to Amazon OpenSearch Service.
Opensearch-py client library also provides an in-house IAM based authentication feature, AWSV4SignerAuth
that will help users to connect to their opensearch clusters by making use of IAM roles.
AWSV4SignerAuth
uses RequestHttpConnection as transport class for communication with opensearch clusters. Opensearch-py client library provides pool_maxsize
option to modify default connection-pool size.
-
Python version 3.6 or above,
-
Install botocore using pip
pip install botocore
Here is the sample code that uses AWSV4SignerAuth
-
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
import boto3
host = '' # cluster endpoint, for example: my-test-domain.us-east-1.es.amazonaws.com
region = 'us-west-2'
service = 'es' # 'aoss' for OpenSearch Serverless
credentials = boto3.Session().get_credentials()
auth = AWSV4SignerAuth(credentials, region, service)
index_name = 'python-test-index3'
client = OpenSearch(
hosts = [{'host': host, 'port': 443}],
http_auth = auth,
use_ssl = True,
verify_certs = True,
connection_class = RequestsHttpConnection,
pool_maxsize = 20
)
q = 'miller'
query = {
'size': 5,
'query': {
'multi_match': {
'query': q,
'fields': ['title^2', 'director']
}
}
}
response = client.search(
body = query,
index = index_name
)
print('\nSearch results:')
print(response)
Make sure to use AsyncOpenSearch
with the AsyncHttpConnection
connection class with the async AWSV4SignerAsyncAuth
signer.
- Requires opensearch-py[async]
Here is the sample code that uses AWSV4SignerAsyncAuth
-
from opensearchpy import AsyncOpenSearch, AsyncHttpConnection, AWSV4SignerAsyncAuth
import boto3
host = '' # cluster endpoint, for example: my-test-domain.us-east-1.es.amazonaws.com
region = 'us-west-2'
service = 'es' # 'aoss' for OpenSearch Serverless
credentials = boto3.Session().get_credentials()
auth = AWSV4SignerAsyncAuth(credentials, region, service)
index_name = 'python-test-index3'
client = AsyncOpenSearch(
hosts = [{'host': host, 'port': 443}],
http_auth = auth,
use_ssl = True,
verify_certs = True,
connection_class = AsyncHttpConnection
)
async def search():
q = 'miller'
query = {
'size': 5,
'query': {
'multi_match': {
'query': q,
'fields': ['title^2', 'director']
}
}
}
response = await client.search(
body = query,
index = index_name
)
print('\nSearch results:')
print(response)
search()
There are several python packages that provide Kerberos support over HTTP connections, such as requests-kerberos and requests-gssapi. The following example shows how to setup the authentication. Note that some of the parameters, such as mutual_authentication
might depend on the server settings.
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_kerberos import HTTPKerberosAuth, OPTIONAL
client = OpenSearch(
['htps://...'],
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
http_auth=HTTPKerberosAuth(mutual_authentication=OPTIONAL)
)
health = client.cluster.health()