-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathsearch.py
130 lines (106 loc) · 4.17 KB
/
search.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
"""
This file contains code samples for search queries.
Run the following to check the available methods:
.. code-block:: shellpython
python search.py --help
"""
import typer
from config import INDEX_NAME, SERVICE_URI, client
from helpers import log_titles
from typing import List
app = typer.Typer()
@app.command("match")
def search_match(field: str, query: str, operator: str = "or") -> None:
"""Perform search by relevance for certain field and query."""
typer.echo(f"Searching for {query} in the field {field} \n")
query_body = {"query": {"match": {field: {"query": query, "operator": operator}}}}
resp = client.search(index=INDEX_NAME, body=query_body)
log_titles(resp)
@app.command("multi-match")
def search_multi_match(fields: List[str], query: str) -> None:
"""Perform search by relevance for certain field and query."""
typer.echo(f"Searching for {query} in the field {fields} \n")
query_body = {"query": {"multi_match": {"query": query, "fields": fields}}}
resp = client.search(index=INDEX_NAME, body=query_body)
log_titles(resp)
@app.command("match-phrase")
def search_match_phrase(field, query):
"""Search by match phrase for specific phrases in a field."""
typer.echo(f"Searching for {query} in the field {field}")
query_body = {"query": {"match_phrase": {field: {"query": query}}}}
resp = client.search(index=INDEX_NAME, body=query_body)
log_titles(resp)
@app.command("range")
def search_range(field: str, gte, lte) -> None:
"""Search by specifying a range of values for a field"""
typer.echo(f"Searching for values in the {field} ranging from {gte} to {lte} \n")
query_body = {"query": {"range": {field: {"gte": gte, "lte": lte}}}}
resp = client.search(index=INDEX_NAME, body=query_body)
log_titles(resp)
@app.command("fuzzy")
def search_fuzzy(field, value, fuzziness) -> None:
"""Search by specifying fuzziness to account for typos and misspelling."""
typer.echo(
f"Search for {value} in the {field} with fuzziness set to {fuzziness} \n"
)
query_body = {
"query": {
"fuzzy": {
field: {
"value": value,
"fuzziness": fuzziness,
}
}
}
}
resp = client.search(index=INDEX_NAME, body=query_body)
log_titles(resp)
@app.command("query-string")
def search_query_string(field: str, query: str, size: int) -> None:
"""Search by using operators with query string and size parameter"""
typer.echo(
f"Searching for {query} in the field {field} and returning maximum {size} results"
)
query_body = {
"query": {
"query_string": {
"query": "(new york city) OR (big apple)",
"default_field": "content",
}
}
}
resp = client.search(index=INDEX_NAME, body=query_body, size=size)
log_titles(resp)
@app.command("slop")
def search_slop(field, query, slop):
"""Search by specifying a slop - a distance between search word"""
typer.echo(f"Searching for {query} with slop value {slop} in the field {field}")
query_body = {"query": {"match_phrase": {field: {"query": query, "slop": slop}}}}
resp = client.search(index=INDEX_NAME, body=query_body)
log_titles(resp)
@app.command("combine")
def search_combined_queries():
"""Search by using boolean logic with multiple search queries combined"""
typer.echo(
f"Searching for:\n must match category: Quick & Easy;\
\n must not match ingredients: garlic"
)
query_body = {
"query": {
"bool": {
"must": {"match": {"categories": "Quick & Easy"}},
"must_not": {"match": {"ingredients": "garlic"}},
}
}
}
resp = client.search(index=INDEX_NAME, body=query_body)
log_titles(resp)
@app.command("term")
def search_term(field: str, value: int):
"""Searching for exact matches of a value in a field."""
typer.echo(f"Searching for exact value {value} in the field {field}")
query_body = {"query": {"term": {field: value}}}
resp = client.search(index=INDEX_NAME, body=query_body)
log_titles(resp)
if __name__ == "__main__":
app()