Skip to content

Latest commit

 

History

History
552 lines (450 loc) · 14.8 KB

USER_GUIDE.md

File metadata and controls

552 lines (450 loc) · 14.8 KB

User guide of OpenSearch Python client

Setup

To add the client to your project, install it using pip:

pip install opensearch-py

Then import it like any other module:

from opensearchpy import OpenSearch

To add the async client to your project, install it using pip:

pip install opensearch-py[async]

If you prefer to add the client manually or just want to examine the source code, see opensearch-py on GitHub.

Example

In the example given below, we create a client, an index with non-default settings, insert a document in the index, search for the document, delete the document and finally delete the index.

Creating a client

from opensearchpy import OpenSearch

host = 'localhost'
port = 9200
auth = ('admin', 'admin') # For testing only. Don't store credentials in code.

# Provide a CA bundle if you use intermediate CAs with your root CA.
# If this is not given, the CA bundle is is discovered from the first available
# following options:
# - OpenSSL environment variables SSL_CERT_FILE and SSL_CERT_DIR
# - certifi bundle (https://pypi.org/project/certifi/)
# - default behavior of the connection backend (most likely system certs)
ca_certs_path = '/full/path/to/root-ca.pem'

# Optional client certificates if you don't want to use HTTP basic authentication.
# client_cert_path = '/full/path/to/client.pem'
# client_key_path = '/full/path/to/client-key.pem'

# Create the client with SSL/TLS enabled, but hostname verification disabled.
client = OpenSearch(
    hosts = [{'host': host, 'port': port}],
    http_compress = True, # enables gzip compression for request bodies
    http_auth = auth,
    # client_cert = client_cert_path,
    # client_key = client_key_path,
    use_ssl = True,
    verify_certs = True,
    ssl_assert_hostname = False,
    ssl_show_warn = False,
    ca_certs = ca_certs_path
)

Creating an index

# Create an index with non-default settings.
index_name = 'python-test-index3'
index_body = {
  'settings': {
    'index': {
      'number_of_shards': 4
    }
  }
}

response = client.indices.create(index_name, body=index_body)
print('\nCreating index:')
print(response)

Adding a document to an index

document = {
  'title': 'Moneyball',
  'director': 'Bennett Miller',
  'year': '2011'
}
id = '1'

response = client.index(
    index = index_name,
    body = document,
    id = id,
    refresh = True
)

print('\nAdding document:')
print(response)

Adding documents in bulk

docs = '''{"index": {"_index": "index-2022-06-08", "_id": "1"}}
{"name": "foo"} 
{"index": {"_index": "index-2022-06-09", "_id": "2"}}
{"name": "bar"}
{"index": {"_index": "index-2022-06-10", "_id": "3"}}
{"name": "baz"}'''

response = client.bulk(docs)

print('\nAdding bulk documents:')
print(response)

Adding documents in bulk using helper functions

docs = []
def generate_data():
    mywords = ['foo', 'bar', 'baz']
    for index, word in enumerate(mywords):
        docs.append({
            "_index": "mywords",
            "word": word,
            "_id": index
        })
    return docs

response = helpers.bulk(client, generate_data(), max_retries=3)

print('\nAdding bulk documents using helper:')
print(response)

Searching for a document

q = 'miller'
query = {
  'size': 5,
  'query': {
    'multi_match': {
      'query': q,
      'fields': ['title^2', 'director']
    }
  }
}

response = client.search(
    body = query,
    index = index_name
)
print('\nSearch results:')
print(response)

Deleting a document

response = client.delete(
    index = index_name,
    id = id
)

print('\nDeleting document:')
print(response)

Deleting an index

response = client.indices.delete(
    index = index_name
)

print('\nDeleting index:')
print(response)

Making API calls

Point in time API

# create a point in time on a index
index_name = "test-index"
response = client.create_point_in_time(index=index_name,
                                       keep_alive="1m")

pit_id = response.get("pit_id")
print('\n Point in time ID:')
print(pit_id)

# To list all point in time which are alive in the cluster
response = client.list_all_point_in_time()
print('\n List of all Point in Time:')
print(response)

# To delete point in time
pit_body = {
    "pit_id": [pit_id]
}

# To delete all point in time 
# client.delete_point_in_time(body=None, all=True)
response = client.delete_point_in_time(body=pit_body)

print('\n The deleted point in time:')
print(response)

Using DSL features from opensearch-dsl-py

opensearch-dsl-py client is now merged into the opensearch-py client. Thus, opensearch-py supports creating and indexing documents, searching with and without filters, and updating documents using queries. See opensearch-dsl-py client documentation for details.

All the APIs newly added from opensearch-dsl-py are listed in docs.

In the below example, Search API from opensearch-dsl-py client is used.

Searching for documents with filters

from opensearchpy import OpenSearch, Search

    # Use the above mentioned examples for creating client. 
    # Then,create an index
    # Add a document to the index.

    # Search for the document.
    s = Search(using=client, index=index_name) \
        .filter("term", category="search") \
        .query("match", title="python")

    response = s.execute()

    print('\nSearch results:')
    for hit in response:
        print(hit.meta.score, hit.title)

    # Delete the document.
    # Delete the index.

Using plugins

Plugin client definitions can be found here --

Alerting plugin

Searching for monitors

API definition

print('\Searching for monitors:')

query = {
  "query": {
    "match" : {
      "monitor.name": "test-monitor"
    }
  }
}

response = client.plugins.alerting.search_monitor(query)
print(response)

Getting a monitor

API definition

print('\Getting a monitor:')

response = client.plugins.alerting.get_monitor("monitorID")
print(response)

Creating a monitor

API definition

print('\Creating a bucket level monitor:')

query = {
  "type": "monitor",
  "name": "Demo bucket-level monitor",
  "monitor_type": "bucket_level_monitor",
  "enabled": True,
  "schedule": {
    "period": {
      "interval": 1,
      "unit": "MINUTES"
    }
  },
  "inputs": [
    {
      "search": {
        "indices": [
          "python-test-index3"
        ],
        "query": {
          "size": 0,
          "query": {
            "bool": {
              "filter": [
                {
                  "range": {
                    "order_date": {
                      "from": "||-1h",
                      "to": "",
                      "include_lower": True,
                      "include_upper": True,
                      "format": "epoch_millis"
                    }
                  }
                }
              ]
            }
          },
          "aggregations": {
            "composite_agg": {
              "composite": {
                "sources": [
                  {
                    "user": {
                      "terms": {
                        "field": "user"
                      }
                    }
                  }
                ]
              },
              "aggregations": {
                "avg_products_base_price": {
                  "avg": {
                    "field": "products.base_price"
                  }
                }
              }
            }
          }
        }
      }
    }
  ],
}

response = client.plugins.alerting.create_monitor(query)
print(response)

Creating a destination

API definition

print('\Creating an email destination:')

query = {
  "type": "email",
  "name": "my-email-destination",
  "email": {
    "email_account_id": "YjY7mXMBx015759_IcfW",
    "recipients": [
      {
        "type": "email_group",
        "email_group_id": "YzY-mXMBx015759_dscs"
      },
      {
        "type": "email",
        "email": "example@email.com"
      }
    ]
  }
}

response = client.plugins.alerting.create_destination(query)
print(response)

Getting alerts

API definition

print('\Getting alerts:')

response = client.plugins.alerting.get_alerts()
print(response)

Acknowledge alerts

API definition

print('\Acknowledge alerts:')

query = {
  "alerts": ["eQURa3gBKo1jAh6qUo49"]
}

response = client.plugins.alerting.acknowledge_alert(query)
print(response)

Using different authentication methods

It is possible to use different methods for the authentication to OpenSearch. The parameters of connection_class and http_auth can be used for this. The following examples show how to authenticate using IAM credentials and using Kerberos.

Using IAM credentials

Refer the AWS documentation regarding usage of IAM credentials to sign requests to OpenSearch APIs - Signing HTTP requests to Amazon OpenSearch Service.

Opensearch-py client library also provides an in-house IAM based authentication feature, AWSV4SignerAuth that will help users to connect to their opensearch clusters by making use of IAM roles.

AWSV4SignerAuth uses RequestHttpConnection as transport class for communication with opensearch clusters. Opensearch-py client library provides pool_maxsize option to modify default connection-pool size.

Pre-requisites to use AWSV4SignerAuth

  • Python version 3.6 or above,

  • Install botocore using pip

    pip install botocore

Here is the sample code that uses AWSV4SignerAuth -

from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
import boto3

host = '' # cluster endpoint, for example: my-test-domain.us-east-1.es.amazonaws.com
region = 'us-west-2'
service = 'es' # 'aoss' for OpenSearch Serverless
credentials = boto3.Session().get_credentials()
auth = AWSV4SignerAuth(credentials, region, service)
index_name = 'python-test-index3'

client = OpenSearch(
    hosts = [{'host': host, 'port': 443}],
    http_auth = auth,
    use_ssl = True,
    verify_certs = True,
    connection_class = RequestsHttpConnection,
    pool_maxsize = 20
)

q = 'miller'
query = {
  'size': 5,
  'query': {
    'multi_match': {
      'query': q,
      'fields': ['title^2', 'director']
    }
  }
}

response = client.search(
    body = query,
    index = index_name
)

print('\nSearch results:')
print(response)

Using IAM authentication with an async client

Make sure to use AsyncOpenSearch with the AsyncHttpConnection connection class with the async AWSV4SignerAsyncAuth signer.

  • Requires opensearch-py[async]

Here is the sample code that uses AWSV4SignerAsyncAuth -

from opensearchpy import AsyncOpenSearch, AsyncHttpConnection, AWSV4SignerAsyncAuth
import boto3

host = '' # cluster endpoint, for example: my-test-domain.us-east-1.es.amazonaws.com
region = 'us-west-2'
service = 'es' # 'aoss' for OpenSearch Serverless
credentials = boto3.Session().get_credentials()
auth = AWSV4SignerAsyncAuth(credentials, region, service)
index_name = 'python-test-index3'

client = AsyncOpenSearch(
    hosts = [{'host': host, 'port': 443}],
    http_auth = auth,
    use_ssl = True,
    verify_certs = True,
    connection_class = AsyncHttpConnection
)

async def search():
  q = 'miller'
  query = {
    'size': 5,
    'query': {
      'multi_match': {
        'query': q,
        'fields': ['title^2', 'director']
      }
    }
  }

  response = await client.search(
      body = query,
      index = index_name
  )

  print('\nSearch results:')
  print(response)

search()

Using Kerberos

There are several python packages that provide Kerberos support over HTTP connections, such as requests-kerberos and requests-gssapi. The following example shows how to setup the authentication. Note that some of the parameters, such as mutual_authentication might depend on the server settings.

from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_kerberos import HTTPKerberosAuth, OPTIONAL

client = OpenSearch(
    ['htps://...'],
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection,
    http_auth=HTTPKerberosAuth(mutual_authentication=OPTIONAL)
)

health = client.cluster.health()